From ce7f760586459d37ce74e5d0b7d5edf96ad5e77b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 8 Jul 2024 14:30:11 -0600 Subject: [PATCH 01/12] Partial upgrade to DataFusion 40.0.0 --- native/Cargo.lock | 104 ++++++++++-------- native/core/Cargo.toml | 26 ++--- .../execution/datafusion/expressions/abs.rs | 2 +- .../execution/datafusion/expressions/avg.rs | 2 +- .../execution/datafusion/operators/expand.rs | 6 + .../core/src/execution/datafusion/planner.rs | 83 ++++++++------ .../execution/datafusion/shuffle_writer.rs | 4 + native/core/src/execution/operators/copy.rs | 4 + native/core/src/execution/operators/scan.rs | 4 + 9 files changed, 142 insertions(+), 93 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index ccb6433d9..94a278437 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -114,8 +114,9 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "arrow" -version = "52.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10" +version = "52.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6127ea5e585a12ec9f742232442828ebaf264dfa5eefdd71282376c599562b77" dependencies = [ "arrow-arith", "arrow-array", @@ -134,8 +135,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "52.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10" +version = "52.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7add7f39210b7d726e2a8efc0083e7bf06e8f2d15bdb4896b564dce4410fbf5d" dependencies = [ "arrow-array", "arrow-buffer", @@ -148,8 +150,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "52.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10" +version = "52.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81c16ec702d3898c2f5cfdc148443c6cd7dbe5bac28399859eb0a3d38f072827" dependencies = [ "ahash", "arrow-buffer", @@ -164,8 +167,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "52.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10" +version = "52.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cae6970bab043c4fbc10aee1660ceb5b306d0c42c8cc5f6ae564efcd9759b663" dependencies = [ "bytes", "half", @@ -174,8 +178,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "52.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10" +version = "52.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c7ef44f26ef4f8edc392a048324ed5d757ad09135eff6d5509e6450d39e0398" dependencies = [ "arrow-array", "arrow-buffer", @@ -194,8 +199,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "52.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10" +version = "52.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f843490bd258c5182b66e888161bb6f198f49f3792f7c7f98198b924ae0f564" dependencies = [ "arrow-array", "arrow-buffer", @@ -212,8 +218,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "52.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10" +version = "52.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a769666ffac256dd301006faca1ca553d0ae7cffcf4cd07095f73f95eb226514" dependencies = [ "arrow-buffer", "arrow-schema", @@ -223,8 +230,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "52.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10" +version = "52.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf9c3fb57390a1af0b7bb3b5558c1ee1f63905f3eccf49ae7676a8d1e6e5a72" dependencies = [ "arrow-array", "arrow-buffer", @@ -237,8 +245,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "52.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10" +version = "52.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "654e7f3724176b66ddfacba31af397c48e106fbe4d281c8144e7d237df5acfd7" dependencies = [ "arrow-array", "arrow-buffer", @@ -256,8 +265,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "52.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10" +version = "52.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8008370e624e8e3c68174faaf793540287106cfda8ad1da862fdc53d8e096b4" dependencies = [ "arrow-array", "arrow-buffer", @@ -270,8 +280,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "52.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10" +version = "52.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca5e3a6b7fda8d9fe03f3b18a2d946354ea7f3c8e4076dbdb502ad50d9d44824" dependencies = [ "ahash", "arrow-array", @@ -284,16 +295,18 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "52.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10" +version = "52.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dab1c12b40e29d9f3b699e0203c2a73ba558444c05e388a4377208f8f9c97eee" dependencies = [ "bitflags 2.6.0", ] [[package]] name = "arrow-select" -version = "52.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10" +version = "52.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e80159088ffe8c48965cb9b1a7c968b2729f29f37363df7eca177fc3281fe7c3" dependencies = [ "ahash", "arrow-array", @@ -305,8 +318,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "52.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10" +version = "52.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fd04a6ea7de183648edbcb7a6dd925bbd04c210895f6384c780e27a9b54afcd" dependencies = [ "arrow-array", "arrow-buffer", @@ -791,7 +805,7 @@ dependencies = [ [[package]] name = "datafusion" version = "39.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569" +source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" dependencies = [ "ahash", "arrow", @@ -898,7 +912,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "39.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569" +source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" dependencies = [ "ahash", "arrow", @@ -918,7 +932,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "39.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569" +source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" dependencies = [ "tokio", ] @@ -926,7 +940,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "39.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569" +source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" dependencies = [ "arrow", "chrono", @@ -946,7 +960,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "39.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569" +source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" dependencies = [ "ahash", "arrow", @@ -964,7 +978,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "39.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569" +source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" dependencies = [ "arrow", "base64", @@ -974,7 +988,6 @@ dependencies = [ "datafusion-common", "datafusion-execution", "datafusion-expr", - "datafusion-physical-expr", "hashbrown", "hex", "itertools 0.12.1", @@ -990,7 +1003,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "39.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569" +source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" dependencies = [ "ahash", "arrow", @@ -1007,7 +1020,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "39.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569" +source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" dependencies = [ "arrow", "async-trait", @@ -1019,13 +1032,14 @@ dependencies = [ "indexmap", "itertools 0.12.1", "log", + "paste", "regex-syntax", ] [[package]] name = "datafusion-physical-expr" version = "39.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569" +source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" dependencies = [ "ahash", "arrow", @@ -1039,7 +1053,6 @@ dependencies = [ "datafusion-common", "datafusion-execution", "datafusion-expr", - "datafusion-functions-aggregate", "datafusion-physical-expr-common", "half", "hashbrown", @@ -1055,18 +1068,20 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "39.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569" +source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" dependencies = [ + "ahash", "arrow", "datafusion-common", "datafusion-expr", + "hashbrown", "rand", ] [[package]] name = "datafusion-physical-plan" version = "39.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569" +source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" dependencies = [ "ahash", "arrow", @@ -1099,7 +1114,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "39.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569" +source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" dependencies = [ "arrow", "arrow-array", @@ -2037,8 +2052,9 @@ dependencies = [ [[package]] name = "parquet" -version = "52.0.0" -source = "git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10" +version = "52.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f22ba0d95db56dde8685e3fadcb915cdaadda31ab8abbe3ff7f0ad1ef333267" dependencies = [ "ahash", "bytes", diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index c3e924a44..365298e15 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -33,13 +33,13 @@ include = [ [dependencies] parquet-format = "4.0.0" # This must be kept in sync with that from parquet crate -arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1", features = ["prettyprint", "ffi", "chrono-tz"] } -arrow-array = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1" } -arrow-buffer = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1" } -arrow-data = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1" } -arrow-schema = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1" } -arrow-string = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1" } -parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1", default-features = false, features = ["experimental"] } +arrow = { version = "52.1.0", features = ["prettyprint", "ffi", "chrono-tz"] } +arrow-array = { version = "52.1.0" } +arrow-buffer = { version = "52.1.0" } +arrow-data = { version = "52.1.0" } +arrow-schema = { version = "52.1.0" } +arrow-string = { version = "52.1.0" } +parquet = { version = "52.1.0", default-features = false, features = ["experimental"] } half = { version = "2.4.1", default-features = false } futures = "0.3.28" mimalloc = { version = "*", default-features = false, optional = true } @@ -71,12 +71,12 @@ itertools = "0.11.0" chrono = { version = "0.4", default-features = false, features = ["clock"] } chrono-tz = { version = "0.8" } paste = "1.0.14" -datafusion-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "17446b1" } -datafusion = { default-features = false, git = "https://github.com/viirya/arrow-datafusion.git", rev = "17446b1", features = ["unicode_expressions", "crypto_expressions"] } -datafusion-functions = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "17446b1", features = ["crypto_expressions"] } -datafusion-expr = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "17446b1", default-features = false } -datafusion-physical-expr-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "17446b1", default-features = false } -datafusion-physical-expr = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "17446b1", default-features = false } +datafusion-common = { git = "https://github.com/apache/datafusion.git", branch = "main" } +datafusion = { default-features = false, git = "https://github.com/apache/datafusion.git", branch = "main", features = ["unicode_expressions", "crypto_expressions"] } +datafusion-functions = { git = "https://github.com/apache/datafusion.git", branch = "main", features = ["crypto_expressions"] } +datafusion-expr = { git = "https://github.com/apache/datafusion.git", branch = "main", default-features = false } +datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion.git", branch = "main", default-features = false } +datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", branch = "main", default-features = false } unicode-segmentation = "^1.10.1" once_cell = "1.18.0" regex = "1.9.6" diff --git a/native/core/src/execution/datafusion/expressions/abs.rs b/native/core/src/execution/datafusion/expressions/abs.rs index 4eb8c7c1e..a037e5cbc 100644 --- a/native/core/src/execution/datafusion/expressions/abs.rs +++ b/native/core/src/execution/datafusion/expressions/abs.rs @@ -37,7 +37,7 @@ impl CometAbsFunc { pub fn new(eval_mode: EvalMode, data_type_name: String) -> Result { if let EvalMode::Legacy | EvalMode::Ansi = eval_mode { Ok(Self { - inner_abs_func: math::abs().inner(), + inner_abs_func: math::abs().inner().clone(), eval_mode, data_type_name, }) diff --git a/native/core/src/execution/datafusion/expressions/avg.rs b/native/core/src/execution/datafusion/expressions/avg.rs index 1ff276e5d..3c8865bd1 100644 --- a/native/core/src/execution/datafusion/expressions/avg.rs +++ b/native/core/src/execution/datafusion/expressions/avg.rs @@ -47,7 +47,7 @@ pub struct Avg { impl Avg { /// Create a new AVG aggregate function pub fn new(expr: Arc, name: impl Into, data_type: DataType) -> Self { - let result_data_type = avg_return_type(&data_type).unwrap(); + let result_data_type = avg_return_type("avg", &data_type).unwrap(); Self { name: name.into(), diff --git a/native/core/src/execution/datafusion/operators/expand.rs b/native/core/src/execution/datafusion/operators/expand.rs index 5285dfb46..2abcae4cb 100644 --- a/native/core/src/execution/datafusion/operators/expand.rs +++ b/native/core/src/execution/datafusion/operators/expand.rs @@ -92,6 +92,8 @@ impl ExecutionPlan for CometExpandExec { self } + + fn schema(&self) -> SchemaRef { self.schema.clone() } @@ -126,6 +128,10 @@ impl ExecutionPlan for CometExpandExec { fn properties(&self) -> &PlanProperties { &self.cache } + + fn name(&self) -> &str { + "CometExpandExec" + } } pub struct ExpandStream { diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 40515c0c4..c90682044 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -20,6 +20,7 @@ use std::{collections::HashMap, sync::Arc}; use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use datafusion::prelude::{BitAnd, BitOr, BitXor}; use datafusion::physical_plan::windows::BoundedWindowAggExec; use datafusion::physical_plan::InputOrderMode; use datafusion::{ @@ -27,12 +28,15 @@ use datafusion::{ common::DataFusionError, execution::FunctionRegistry, logical_expr::Operator as DataFusionOperator, + functions_aggregate::count::Count, + functions_aggregate::sum::Sum, + functions_aggregate::first_last::{FirstValue, LastValue}, physical_expr::{ execution_props::ExecutionProps, expressions::{ - in_list, BinaryExpr, BitAnd, BitOr, BitXor, CaseExpr, CastExpr, Column, Count, - FirstValue, IsNotNullExpr, IsNullExpr, LastValue, Literal as DataFusionLiteral, Max, - Min, NotExpr, Sum, + in_list, BinaryExpr, CaseExpr, CastExpr, Column, + IsNotNullExpr, IsNullExpr, Literal as DataFusionLiteral, Max, + Min, NotExpr }, AggregateExpr, PhysicalExpr, PhysicalSortExpr, ScalarFunctionExpr, }, @@ -647,7 +651,7 @@ impl PhysicalPlanner { let left = self.create_expr(left, input_schema.clone())?; let right = self.create_expr(right, input_schema.clone())?; match ( - op, + &op, left.data_type(&input_schema), right.data_type(&input_schema), ) { @@ -672,7 +676,7 @@ impl PhysicalPlanner { DataType::Decimal256(p2, s2), EvalMode::Legacy, )); - let child = Arc::new(BinaryExpr::new(left, op, right)); + let child = Arc::new(BinaryExpr::new(left, op.clone(), right)); Ok(Arc::new(Cast::new_without_timezone( child, data_type, @@ -1203,16 +1207,17 @@ impl PhysicalPlanner { ) -> Result, ExecutionError> { match spark_expr.expr_struct.as_ref().unwrap() { AggExprStruct::Count(expr) => { - let children = expr - .children - .iter() - .map(|child| self.create_expr(child, schema.clone())) - .collect::, _>>()?; - Ok(Arc::new(Count::new_with_multiple_exprs( - children, - "count", - DataType::Int64, - ))) + // let children = expr + // .children + // .iter() + // .map(|child| self.create_expr(child, schema.clone())) + // .collect::, _>>()?; + // Ok(Arc::new(Count::new_with_multiple_exprs( + // children, + // "count", + // DataType::Int64, + // ))) + todo!() } AggExprStruct::Min(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; @@ -1236,7 +1241,8 @@ impl PhysicalPlanner { // cast to the result data type of SUM if necessary, we should not expect // a cast failure since it should have already been checked at Spark side let child = Arc::new(CastExpr::new(child, datatype.clone(), None)); - Ok(Arc::new(Sum::new(child, "sum", datatype))) + // Ok(Arc::new(Sum::new(child, "sum", datatype))) + todo!() } } } @@ -1264,30 +1270,37 @@ impl PhysicalPlanner { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; let func = datafusion_expr::AggregateUDF::new_from_impl(FirstValue::new()); - create_aggregate_expr(&func, &[child], &[], &[], &schema, "first", false, false) - .map_err(|e| e.into()) + // create_aggregate_expr(&func, &[child], &[], &[], &schema, "first", false, false) + // .map_err(|e| e.into()) + + todo!() } AggExprStruct::Last(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; let func = datafusion_expr::AggregateUDF::new_from_impl(LastValue::new()); - create_aggregate_expr(&func, &[child], &[], &[], &schema, "last", false, false) - .map_err(|e| e.into()) + // create_aggregate_expr(&func, &[child], &[], &[], &schema, "last", false, false) + // .map_err(|e| e.into()) + + todo!() } AggExprStruct::BitAndAgg(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); - Ok(Arc::new(BitAnd::new(child, "bit_and", datatype))) + // Ok(Arc::new(BitAnd::new(child, "bit_and", datatype))) + todo!() } AggExprStruct::BitOrAgg(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); - Ok(Arc::new(BitOr::new(child, "bit_or", datatype))) + // Ok(Arc::new(BitOr::new(child, "bit_or", datatype))) + todo!() } AggExprStruct::BitXorAgg(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); - Ok(Arc::new(BitXor::new(child, "bit_xor", datatype))) + // Ok(Arc::new(BitXor::new(child, "bit_xor", datatype))) + todo!() } AggExprStruct::Covariance(expr) => { let child1 = self.create_expr(expr.child1.as_ref().unwrap(), schema.clone())?; @@ -1479,17 +1492,19 @@ impl PhysicalPlanner { let window_frame = WindowFrame::new_bounds(units, lower_bound, upper_bound); - datafusion::physical_plan::windows::create_window_expr( - &window_func, - window_func_name, - &window_args, - partition_by, - sort_exprs, - window_frame.into(), - &input_schema, - false, // TODO: Ignore nulls - ) - .map_err(|e| ExecutionError::DataFusionError(e.to_string())) + // datafusion::physical_plan::windows::create_window_expr( + // &window_func, + // window_func_name, + // &window_args, + // partition_by, + // sort_exprs, + // window_frame.into(), + // &input_schema, + // false, // TODO: Ignore nulls + // ) + // .map_err(|e| ExecutionError::DataFusionError(e.to_string())) + + todo!() } fn process_agg_func(agg_func: &AggExpr) -> Result<(String, Vec), ExecutionError> { diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index 5afc9a53e..6e59ce53a 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -160,6 +160,10 @@ impl ExecutionPlan for ShuffleWriterExec { fn properties(&self) -> &PlanProperties { &self.cache } + + fn name(&self) -> &str { + "ShuffleWriterExec" + } } impl ShuffleWriterExec { diff --git a/native/core/src/execution/operators/copy.rs b/native/core/src/execution/operators/copy.rs index d011b3cb2..68c91aafc 100644 --- a/native/core/src/execution/operators/copy.rs +++ b/native/core/src/execution/operators/copy.rs @@ -126,6 +126,10 @@ impl ExecutionPlan for CopyExec { fn properties(&self) -> &PlanProperties { &self.cache } + + fn name(&self) -> &str { + "CopyExec" + } } struct CopyStream { diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index de5328210..68dd773cf 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -270,6 +270,10 @@ impl ExecutionPlan for ScanExec { fn properties(&self) -> &PlanProperties { &self.cache } + + fn name(&self) -> &str { + "ScanExec" + } } impl DisplayAs for ScanExec { From 51fe63a22287f8fdbde802c0c13f05848fc3f445 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 8 Jul 2024 15:23:42 -0600 Subject: [PATCH 02/12] fix --- native/Cargo.lock | 24 +++---- native/core/Cargo.toml | 12 ++-- .../execution/datafusion/operators/expand.rs | 2 - .../core/src/execution/datafusion/planner.rs | 66 ++++++++++--------- 4 files changed, 54 insertions(+), 50 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 94a278437..e09ddaa70 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -805,7 +805,7 @@ dependencies = [ [[package]] name = "datafusion" version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" +source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" dependencies = [ "ahash", "arrow", @@ -912,7 +912,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" +source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" dependencies = [ "ahash", "arrow", @@ -932,7 +932,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" +source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" dependencies = [ "tokio", ] @@ -940,7 +940,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" +source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" dependencies = [ "arrow", "chrono", @@ -960,7 +960,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" +source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" dependencies = [ "ahash", "arrow", @@ -978,7 +978,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" +source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" dependencies = [ "arrow", "base64", @@ -1003,7 +1003,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" +source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" dependencies = [ "ahash", "arrow", @@ -1020,7 +1020,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" +source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" dependencies = [ "arrow", "async-trait", @@ -1039,7 +1039,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" +source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" dependencies = [ "ahash", "arrow", @@ -1068,7 +1068,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" +source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" dependencies = [ "ahash", "arrow", @@ -1081,7 +1081,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" +source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" dependencies = [ "ahash", "arrow", @@ -1114,7 +1114,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?branch=main#894a8794d11be148cc60db8eac16f105a74d96b1" +source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" dependencies = [ "arrow", "arrow-array", diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 365298e15..07337722b 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -71,12 +71,12 @@ itertools = "0.11.0" chrono = { version = "0.4", default-features = false, features = ["clock"] } chrono-tz = { version = "0.8" } paste = "1.0.14" -datafusion-common = { git = "https://github.com/apache/datafusion.git", branch = "main" } -datafusion = { default-features = false, git = "https://github.com/apache/datafusion.git", branch = "main", features = ["unicode_expressions", "crypto_expressions"] } -datafusion-functions = { git = "https://github.com/apache/datafusion.git", branch = "main", features = ["crypto_expressions"] } -datafusion-expr = { git = "https://github.com/apache/datafusion.git", branch = "main", default-features = false } -datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion.git", branch = "main", default-features = false } -datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", branch = "main", default-features = false } +datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" } +datafusion = { default-features = false, git = "https://github.com/apache/datafusion.git", rev = "8ae56fc2b8c8b283daa16d540fbbf84dd49e1469", features = ["unicode_expressions", "crypto_expressions"] } +datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "8ae56fc2b8c8b283daa16d540fbbf84dd49e1469", features = ["crypto_expressions"] } +datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "8ae56fc2b8c8b283daa16d540fbbf84dd49e1469", default-features = false } +datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "8ae56fc2b8c8b283daa16d540fbbf84dd49e1469", default-features = false } +datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "8ae56fc2b8c8b283daa16d540fbbf84dd49e1469", default-features = false } unicode-segmentation = "^1.10.1" once_cell = "1.18.0" regex = "1.9.6" diff --git a/native/core/src/execution/datafusion/operators/expand.rs b/native/core/src/execution/datafusion/operators/expand.rs index 2abcae4cb..67171212f 100644 --- a/native/core/src/execution/datafusion/operators/expand.rs +++ b/native/core/src/execution/datafusion/operators/expand.rs @@ -92,8 +92,6 @@ impl ExecutionPlan for CometExpandExec { self } - - fn schema(&self) -> SchemaRef { self.schema.clone() } diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index c90682044..0c2868f77 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -20,23 +20,23 @@ use std::{collections::HashMap, sync::Arc}; use arrow_schema::{DataType, Field, Schema, TimeUnit}; -use datafusion::prelude::{BitAnd, BitOr, BitXor}; +use datafusion::functions_aggregate::count::{count, count_udaf}; use datafusion::physical_plan::windows::BoundedWindowAggExec; use datafusion::physical_plan::InputOrderMode; +use datafusion::prelude::{BitAnd, BitOr, BitXor}; use datafusion::{ arrow::{compute::SortOptions, datatypes::SchemaRef}, common::DataFusionError, execution::FunctionRegistry, - logical_expr::Operator as DataFusionOperator, functions_aggregate::count::Count, - functions_aggregate::sum::Sum, functions_aggregate::first_last::{FirstValue, LastValue}, + functions_aggregate::sum::Sum, + logical_expr::Operator as DataFusionOperator, physical_expr::{ execution_props::ExecutionProps, expressions::{ - in_list, BinaryExpr, CaseExpr, CastExpr, Column, - IsNotNullExpr, IsNullExpr, Literal as DataFusionLiteral, Max, - Min, NotExpr + in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, + Literal as DataFusionLiteral, Max, Min, NotExpr, }, AggregateExpr, PhysicalExpr, PhysicalSortExpr, ScalarFunctionExpr, }, @@ -1207,17 +1207,24 @@ impl PhysicalPlanner { ) -> Result, ExecutionError> { match spark_expr.expr_struct.as_ref().unwrap() { AggExprStruct::Count(expr) => { - // let children = expr - // .children - // .iter() - // .map(|child| self.create_expr(child, schema.clone())) - // .collect::, _>>()?; - // Ok(Arc::new(Count::new_with_multiple_exprs( - // children, - // "count", - // DataType::Int64, - // ))) - todo!() + let children = expr + .children + .iter() + .map(|child| self.create_expr(child, schema.clone())) + .collect::, _>>()?; + + create_aggregate_expr( + &count_udaf(), + &children, + &[], + &[], + &[], + schema.as_ref(), + "", + false, + false, + ) + .map_err(|e| ExecutionError::DataFusionError(e.to_string())) } AggExprStruct::Min(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; @@ -1492,19 +1499,18 @@ impl PhysicalPlanner { let window_frame = WindowFrame::new_bounds(units, lower_bound, upper_bound); - // datafusion::physical_plan::windows::create_window_expr( - // &window_func, - // window_func_name, - // &window_args, - // partition_by, - // sort_exprs, - // window_frame.into(), - // &input_schema, - // false, // TODO: Ignore nulls - // ) - // .map_err(|e| ExecutionError::DataFusionError(e.to_string())) - - todo!() + datafusion::physical_plan::windows::create_window_expr( + &window_func, + window_func_name, + &window_args, + &[], + partition_by, + sort_exprs, + window_frame.into(), + &input_schema, + false, // TODO: Ignore nulls + ) + .map_err(|e| ExecutionError::DataFusionError(e.to_string())) } fn process_agg_func(agg_func: &AggExpr) -> Result<(String, Vec), ExecutionError> { From 7e947f7beca59e8ee9b7f69dd356921acfcc5f95 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 8 Jul 2024 15:32:06 -0600 Subject: [PATCH 03/12] implement more udaf --- .../core/src/execution/datafusion/planner.rs | 93 +++++++++++++++---- 1 file changed, 75 insertions(+), 18 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 0c2868f77..ce37d06df 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -20,7 +20,10 @@ use std::{collections::HashMap, sync::Arc}; use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use datafusion::functions_aggregate::bit_and_or_xor::{bit_and_udaf, bit_or_udaf, bit_xor_udaf}; use datafusion::functions_aggregate::count::{count, count_udaf}; +use datafusion::functions_aggregate::first_last::last_value_udaf; +use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::physical_plan::windows::BoundedWindowAggExec; use datafusion::physical_plan::InputOrderMode; use datafusion::prelude::{BitAnd, BitOr, BitXor}; @@ -1248,8 +1251,18 @@ impl PhysicalPlanner { // cast to the result data type of SUM if necessary, we should not expect // a cast failure since it should have already been checked at Spark side let child = Arc::new(CastExpr::new(child, datatype.clone(), None)); - // Ok(Arc::new(Sum::new(child, "sum", datatype))) - todo!() + create_aggregate_expr( + &sum_udaf(), + &[child], + &[], + &[], + &[], + schema.as_ref(), + "", + false, + false, + ) + .map_err(|e| ExecutionError::DataFusionError(e.to_string())) } } } @@ -1276,38 +1289,82 @@ impl PhysicalPlanner { AggExprStruct::First(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; let func = datafusion_expr::AggregateUDF::new_from_impl(FirstValue::new()); - - // create_aggregate_expr(&func, &[child], &[], &[], &schema, "first", false, false) - // .map_err(|e| e.into()) - - todo!() + create_aggregate_expr( + &func, + &[child], + &[], + &[], + &[], + &schema, + "first", + false, + false, + ) + .map_err(|e| e.into()) } AggExprStruct::Last(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; let func = datafusion_expr::AggregateUDF::new_from_impl(LastValue::new()); - - // create_aggregate_expr(&func, &[child], &[], &[], &schema, "last", false, false) - // .map_err(|e| e.into()) - - todo!() + create_aggregate_expr( + &func, + &[child], + &[], + &[], + &[], + &schema, + "last", + false, + false, + ) + .map_err(|e| e.into()) } AggExprStruct::BitAndAgg(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); - // Ok(Arc::new(BitAnd::new(child, "bit_and", datatype))) - todo!() + create_aggregate_expr( + &bit_and_udaf(), + &[child], + &[], + &[], + &[], + &schema, + "bit_and", + false, + false, + ) + .map_err(|e| e.into()) } AggExprStruct::BitOrAgg(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); - // Ok(Arc::new(BitOr::new(child, "bit_or", datatype))) - todo!() + create_aggregate_expr( + &bit_or_udaf(), + &[child], + &[], + &[], + &[], + &schema, + "bit_and", + false, + false, + ) + .map_err(|e| e.into()) } AggExprStruct::BitXorAgg(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); - // Ok(Arc::new(BitXor::new(child, "bit_xor", datatype))) - todo!() + create_aggregate_expr( + &bit_xor_udaf(), + &[child], + &[], + &[], + &[], + &schema, + "bit_and", + false, + false, + ) + .map_err(|e| e.into()) } AggExprStruct::Covariance(expr) => { let child1 = self.create_expr(expr.child1.as_ref().unwrap(), schema.clone())?; From 921f64ecfe0e822562698defd72556afb83e4b7d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 8 Jul 2024 15:41:44 -0600 Subject: [PATCH 04/12] update bitwise agg --- native/core/src/execution/datafusion/planner.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index ce37d06df..d2723ab50 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1319,8 +1319,7 @@ impl PhysicalPlanner { .map_err(|e| e.into()) } AggExprStruct::BitAndAgg(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; - let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); + let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; create_aggregate_expr( &bit_and_udaf(), &[child], @@ -1335,8 +1334,7 @@ impl PhysicalPlanner { .map_err(|e| e.into()) } AggExprStruct::BitOrAgg(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; - let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); + let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; create_aggregate_expr( &bit_or_udaf(), &[child], @@ -1351,8 +1349,7 @@ impl PhysicalPlanner { .map_err(|e| e.into()) } AggExprStruct::BitXorAgg(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; - let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); + let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; create_aggregate_expr( &bit_xor_udaf(), &[child], From 8b4cf3a7b400130b8cea48899314a85ba8f90efe Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 8 Jul 2024 15:55:42 -0600 Subject: [PATCH 05/12] add func names --- native/core/src/execution/datafusion/planner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index d2723ab50..aab3b2814 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1223,7 +1223,7 @@ impl PhysicalPlanner { &[], &[], schema.as_ref(), - "", + "count", false, false, ) @@ -1258,7 +1258,7 @@ impl PhysicalPlanner { &[], &[], schema.as_ref(), - "", + "sum", false, false, ) From 04cfe12008539bfcb5e1ea4f3b8ce43c04c0a77d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 8 Jul 2024 16:11:16 -0600 Subject: [PATCH 06/12] remove unused imports --- native/core/src/execution/datafusion/planner.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index aab3b2814..9135842f0 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -21,19 +21,15 @@ use std::{collections::HashMap, sync::Arc}; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use datafusion::functions_aggregate::bit_and_or_xor::{bit_and_udaf, bit_or_udaf, bit_xor_udaf}; -use datafusion::functions_aggregate::count::{count, count_udaf}; -use datafusion::functions_aggregate::first_last::last_value_udaf; +use datafusion::functions_aggregate::count::count_udaf; use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::physical_plan::windows::BoundedWindowAggExec; use datafusion::physical_plan::InputOrderMode; -use datafusion::prelude::{BitAnd, BitOr, BitXor}; use datafusion::{ arrow::{compute::SortOptions, datatypes::SchemaRef}, common::DataFusionError, execution::FunctionRegistry, - functions_aggregate::count::Count, functions_aggregate::first_last::{FirstValue, LastValue}, - functions_aggregate::sum::Sum, logical_expr::Operator as DataFusionOperator, physical_expr::{ execution_props::ExecutionProps, From a3404459544e1ba1243e756664467ebef84c8aca Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 8 Jul 2024 16:18:28 -0600 Subject: [PATCH 07/12] remove arrow-string dep --- native/core/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 98be0847a..678714cbb 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -37,7 +37,6 @@ arrow-array = { version = "52.1.0" } arrow-buffer = { version = "52.1.0" } arrow-data = { version = "52.1.0" } arrow-schema = { version = "52.1.0" } -arrow-string = { version = "52.1.0" } parquet = { version = "52.1.0", default-features = false, features = ["experimental"] } half = { version = "2.4.1", default-features = false } futures = "0.3.28" From ad31c79d39a4737f248b71e447d68d756ebb67e0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 8 Jul 2024 16:40:38 -0600 Subject: [PATCH 08/12] fix copy and paste error --- native/Cargo.lock | 1 - native/core/src/execution/datafusion/planner.rs | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 316460bc1..852fac2fe 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -857,7 +857,6 @@ dependencies = [ "arrow-buffer", "arrow-data", "arrow-schema", - "arrow-string", "assertables", "async-trait", "brotli", diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 9135842f0..56d6dd5af 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1338,7 +1338,7 @@ impl PhysicalPlanner { &[], &[], &schema, - "bit_and", + "bit_or", false, false, ) @@ -1353,7 +1353,7 @@ impl PhysicalPlanner { &[], &[], &schema, - "bit_and", + "bit_xor", false, false, ) From 1d8e5efd351db980c4c9a7942af7282afda06950 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 9 Jul 2024 06:25:04 -0600 Subject: [PATCH 09/12] use 40.0.0-rc1 and temporarily ignore failing test --- native/Cargo.lock | 48 +++++++++---------- native/core/Cargo.toml | 12 ++--- .../apache/comet/exec/CometExecSuite.scala | 3 +- 3 files changed, 32 insertions(+), 31 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 852fac2fe..df1828ee0 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -804,8 +804,8 @@ dependencies = [ [[package]] name = "datafusion" -version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" +version = "40.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046" dependencies = [ "ahash", "arrow", @@ -904,8 +904,8 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" +version = "40.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046" dependencies = [ "ahash", "arrow", @@ -924,16 +924,16 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" -version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" +version = "40.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046" dependencies = [ "tokio", ] [[package]] name = "datafusion-execution" -version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" +version = "40.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046" dependencies = [ "arrow", "chrono", @@ -952,8 +952,8 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" +version = "40.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046" dependencies = [ "ahash", "arrow", @@ -970,8 +970,8 @@ dependencies = [ [[package]] name = "datafusion-functions" -version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" +version = "40.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046" dependencies = [ "arrow", "base64", @@ -995,8 +995,8 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" -version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" +version = "40.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046" dependencies = [ "ahash", "arrow", @@ -1012,8 +1012,8 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" +version = "40.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046" dependencies = [ "arrow", "async-trait", @@ -1031,8 +1031,8 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" +version = "40.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046" dependencies = [ "ahash", "arrow", @@ -1060,8 +1060,8 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" -version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" +version = "40.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046" dependencies = [ "ahash", "arrow", @@ -1073,8 +1073,8 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" -version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" +version = "40.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046" dependencies = [ "ahash", "arrow", @@ -1106,8 +1106,8 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "39.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=8ae56fc2b8c8b283daa16d540fbbf84dd49e1469#8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" +version = "40.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046" dependencies = [ "arrow", "arrow-array", diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 678714cbb..160db2949 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -64,12 +64,12 @@ itertools = "0.11.0" chrono = { version = "0.4", default-features = false, features = ["clock"] } chrono-tz = { version = "0.8" } paste = "1.0.14" -datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "8ae56fc2b8c8b283daa16d540fbbf84dd49e1469" } -datafusion = { default-features = false, git = "https://github.com/apache/datafusion.git", rev = "8ae56fc2b8c8b283daa16d540fbbf84dd49e1469", features = ["unicode_expressions", "crypto_expressions"] } -datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "8ae56fc2b8c8b283daa16d540fbbf84dd49e1469", features = ["crypto_expressions"] } -datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "8ae56fc2b8c8b283daa16d540fbbf84dd49e1469", default-features = false } -datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "8ae56fc2b8c8b283daa16d540fbbf84dd49e1469", default-features = false } -datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "8ae56fc2b8c8b283daa16d540fbbf84dd49e1469", default-features = false } +datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "40.0.0-rc1" } +datafusion = { default-features = false, git = "https://github.com/apache/datafusion.git", rev = "40.0.0-rc1", features = ["unicode_expressions", "crypto_expressions"] } +datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "40.0.0-rc1", features = ["crypto_expressions"] } +datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "40.0.0-rc1", default-features = false } +datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "40.0.0-rc1", default-features = false } +datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "40.0.0-rc1", default-features = false } unicode-segmentation = "^1.10.1" once_cell = "1.18.0" regex = "1.9.6" diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 9cc4e7f78..276ec386e 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -1431,7 +1431,8 @@ class CometExecSuite extends CometTestBase { }) } - test("Windows support") { + // TODO fix regression + ignore("Windows support") { Seq("true", "false").foreach(aqeEnabled => withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", From f47257ad5183e73221d8ed9af270741ad08f948a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 9 Jul 2024 06:41:51 -0600 Subject: [PATCH 10/12] clippy --- native/core/src/execution/datafusion/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 56d6dd5af..360380400 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -675,7 +675,7 @@ impl PhysicalPlanner { DataType::Decimal256(p2, s2), EvalMode::Legacy, )); - let child = Arc::new(BinaryExpr::new(left, op.clone(), right)); + let child = Arc::new(BinaryExpr::new(left, op, right)); Ok(Arc::new(Cast::new_without_timezone( child, data_type, From 40f00aa34d8db7cf5f71422b67adaadaa6dd747c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 9 Jul 2024 06:55:44 -0600 Subject: [PATCH 11/12] fall back to Spark for count windows aggregate --- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 2 +- .../test/scala/org/apache/comet/exec/CometExecSuite.scala | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 65de37c83..64f99b746 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -208,7 +208,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim expr match { case agg: AggregateExpression => agg.aggregateFunction match { - case _: Min | _: Max | _: Count => + case _: Min | _: Max => Some(agg) case _ => withInfo(windowExpr, "Unsupported aggregate", expr) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 276ec386e..e657af9b9 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -1431,15 +1431,14 @@ class CometExecSuite extends CometTestBase { }) } - // TODO fix regression - ignore("Windows support") { + test("Windows support") { Seq("true", "false").foreach(aqeEnabled => withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { withParquetTable((0 until 10).map(i => (i, 10 - i)), "t1") { // TODO: test nulls val aggregateFunctions = - List("COUNT(_1)", "MAX(_1)", "MIN(_1)") // TODO: Test all the aggregates + List("MAX(_1)", "MIN(_1)") // TODO: Test all the aggregates aggregateFunctions.foreach { function => val queries = Seq( From b6923c5391acae6b5a45012883e022412416c8e6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 9 Jul 2024 13:57:30 -0600 Subject: [PATCH 12/12] address feedback --- .../src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 64f99b746..da534b02c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -208,6 +208,9 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim expr match { case agg: AggregateExpression => agg.aggregateFunction match { + // TODO add support for Count (this was removed when upgrading + // to DataFusion 40 because it is no longer a built-in window function) + // https://github.com/apache/datafusion-comet/issues/645 case _: Min | _: Max => Some(agg) case _ =>