From cd9237f96d60d48711d38b27707ad14896d8e609 Mon Sep 17 00:00:00 2001 From: Tai Le Manh <49281946+tlm365@users.noreply.github.com> Date: Sun, 18 Aug 2024 17:49:24 +0700 Subject: [PATCH 1/4] Minor: Remove warning when building datafusion-cli from Dockerfile (#12018) Signed-off-by: Tai Le Manh --- datafusion-cli/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-cli/Dockerfile b/datafusion-cli/Dockerfile index d231da62a2fd..7adead64db57 100644 --- a/datafusion-cli/Dockerfile +++ b/datafusion-cli/Dockerfile @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -FROM rust:1.78-bookworm as builder +FROM rust:1.78-bookworm AS builder COPY . /usr/src/datafusion COPY ./datafusion /usr/src/datafusion/datafusion From 950dc73c7f763ae8dc56c0a99de864dff444f22b Mon Sep 17 00:00:00 2001 From: Matt Green Date: Sun, 18 Aug 2024 03:49:43 -0700 Subject: [PATCH 2/4] add getter method for LogicalPlanBuilder.plan (#12038) --- datafusion/expr/src/logical_plan/builder.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index f9769560b251..aa28c1c19242 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -107,6 +107,11 @@ impl LogicalPlanBuilder { self.plan.schema() } + /// Return the LogicalPlan of the plan build so far + pub fn plan(&self) -> &LogicalPlan { + &self.plan + } + /// Create an empty relation. /// /// `produce_one_row` set to true means this empty node needs to produce a placeholder row. From a91be04ced3746c673788d5da124c6d30009d9ff Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Sun, 18 Aug 2024 20:47:21 +0800 Subject: [PATCH 3/4] Window UDF signature check (#12045) * udwf sig Signed-off-by: jayzhan211 * add coerce_types Signed-off-by: jayzhan211 * add doc Signed-off-by: jayzhan211 --------- Signed-off-by: jayzhan211 --- datafusion/expr/src/expr_schema.rs | 17 ++++- .../expr/src/type_coercion/functions.rs | 74 +++++++++++++++++-- datafusion/expr/src/udwf.rs | 30 +++++++- datafusion/sqllogictest/test_files/window.slt | 18 +++++ 4 files changed, 130 insertions(+), 9 deletions(-) diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index af35b9a9910d..f6489fef14a1 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -22,7 +22,7 @@ use crate::expr::{ }; use crate::type_coercion::binary::get_result_type; use crate::type_coercion::functions::{ - data_types_with_aggregate_udf, data_types_with_scalar_udf, + data_types_with_aggregate_udf, data_types_with_scalar_udf, data_types_with_window_udf, }; use crate::{utils, LogicalPlan, Projection, Subquery, WindowFunctionDefinition}; use arrow::compute::can_cast_types; @@ -191,6 +191,21 @@ impl ExprSchemable for Expr { })?; Ok(fun.return_type(&new_types, &nullability)?) } + WindowFunctionDefinition::WindowUDF(udwf) => { + let new_types = data_types_with_window_udf(&data_types, udwf) + .map_err(|err| { + plan_datafusion_err!( + "{} {}", + err, + utils::generate_signature_error_msg( + fun.name(), + fun.signature().clone(), + &data_types + ) + ) + })?; + Ok(fun.return_type(&new_types, &nullability)?) + } _ => fun.return_type(&data_types, &nullability), } } diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index 190374b01dd2..b0b14a1a4e6e 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -15,22 +15,21 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; - -use crate::{AggregateUDF, ScalarUDF, Signature, TypeSignature}; +use super::binary::{binary_numeric_coercion, comparison_coercion}; +use crate::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF}; use arrow::{ compute::can_cast_types, datatypes::{DataType, TimeUnit}, }; -use datafusion_common::utils::{coerced_fixed_size_list_to_list, list_ndims}; use datafusion_common::{ - exec_err, internal_datafusion_err, internal_err, plan_err, Result, + exec_err, internal_datafusion_err, internal_err, plan_err, + utils::{coerced_fixed_size_list_to_list, list_ndims}, + Result, }; use datafusion_expr_common::signature::{ ArrayFunctionSignature, FIXED_SIZE_LIST_WILDCARD, TIMEZONE_WILDCARD, }; - -use super::binary::{binary_numeric_coercion, comparison_coercion}; +use std::sync::Arc; /// Performs type coercion for scalar function arguments. /// @@ -66,6 +65,13 @@ pub fn data_types_with_scalar_udf( try_coerce_types(valid_types, current_types, &signature.type_signature) } +/// Performs type coercion for aggregate function arguments. +/// +/// Returns the data types to which each argument must be coerced to +/// match `signature`. +/// +/// For more details on coercion in general, please see the +/// [`type_coercion`](crate::type_coercion) module. pub fn data_types_with_aggregate_udf( current_types: &[DataType], func: &AggregateUDF, @@ -95,6 +101,39 @@ pub fn data_types_with_aggregate_udf( try_coerce_types(valid_types, current_types, &signature.type_signature) } +/// Performs type coercion for window function arguments. +/// +/// Returns the data types to which each argument must be coerced to +/// match `signature`. +/// +/// For more details on coercion in general, please see the +/// [`type_coercion`](crate::type_coercion) module. +pub fn data_types_with_window_udf( + current_types: &[DataType], + func: &WindowUDF, +) -> Result> { + let signature = func.signature(); + + if current_types.is_empty() { + if signature.type_signature.supports_zero_argument() { + return Ok(vec![]); + } else { + return plan_err!("{} does not support zero arguments.", func.name()); + } + } + + let valid_types = + get_valid_types_with_window_udf(&signature.type_signature, current_types, func)?; + if valid_types + .iter() + .any(|data_type| data_type == current_types) + { + return Ok(current_types.to_vec()); + } + + try_coerce_types(valid_types, current_types, &signature.type_signature) +} + /// Performs type coercion for function arguments. /// /// Returns the data types to which each argument must be coerced to @@ -205,6 +244,27 @@ fn get_valid_types_with_aggregate_udf( Ok(valid_types) } +fn get_valid_types_with_window_udf( + signature: &TypeSignature, + current_types: &[DataType], + func: &WindowUDF, +) -> Result>> { + let valid_types = match signature { + TypeSignature::UserDefined => match func.coerce_types(current_types) { + Ok(coerced_types) => vec![coerced_types], + Err(e) => return exec_err!("User-defined coercion failed with {:?}", e), + }, + TypeSignature::OneOf(signatures) => signatures + .iter() + .filter_map(|t| get_valid_types_with_window_udf(t, current_types, func).ok()) + .flatten() + .collect::>(), + _ => get_valid_types(signature, current_types)?, + }; + + Ok(valid_types) +} + /// Returns a Vec of all possible valid argument types for the given signature. fn get_valid_types( signature: &TypeSignature, diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs index aa754a57086f..88b3d613cb43 100644 --- a/datafusion/expr/src/udwf.rs +++ b/datafusion/expr/src/udwf.rs @@ -27,7 +27,7 @@ use std::{ use arrow::datatypes::DataType; -use datafusion_common::Result; +use datafusion_common::{not_impl_err, Result}; use crate::expr::WindowFunction; use crate::{ @@ -192,6 +192,11 @@ impl WindowUDF { pub fn sort_options(&self) -> Option { self.inner.sort_options() } + + /// See [`WindowUDFImpl::coerce_types`] for more details. + pub fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + self.inner.coerce_types(arg_types) + } } impl From for WindowUDF @@ -353,6 +358,29 @@ pub trait WindowUDFImpl: Debug + Send + Sync { fn sort_options(&self) -> Option { None } + + /// Coerce arguments of a function call to types that the function can evaluate. + /// + /// This function is only called if [`WindowUDFImpl::signature`] returns [`crate::TypeSignature::UserDefined`]. Most + /// UDWFs should return one of the other variants of `TypeSignature` which handle common + /// cases + /// + /// See the [type coercion module](crate::type_coercion) + /// documentation for more details on type coercion + /// + /// For example, if your function requires a floating point arguments, but the user calls + /// it like `my_func(1::int)` (aka with `1` as an integer), coerce_types could return `[DataType::Float64]` + /// to ensure the argument was cast to `1::double` + /// + /// # Parameters + /// * `arg_types`: The argument types of the arguments this function with + /// + /// # Return value + /// A Vec the same length as `arg_types`. DataFusion will `CAST` the function call + /// arguments to these specific types. + fn coerce_types(&self, _arg_types: &[DataType]) -> Result> { + not_impl_err!("Function {} does not implement coerce_types", self.name()) + } } /// WindowUDF that adds an alias to the underlying function. It is better to diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 0bf7a8a1eb1b..ef6746730eb6 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -4879,3 +4879,21 @@ SELECT lead(column2, 1.1) OVER (order by column1) FROM t; query error DataFusion error: Execution error: Expected an integer value SELECT nth_value(column2, 1.1) OVER (order by column1) FROM t; + +statement ok +drop table t; + +statement ok +create table t(a int, b int) as values (1, 2) + +query II +select a, row_number() over (order by b) as rn from t; +---- +1 1 + +# RowNumber expect 0 args. +query error +select a, row_number(a) over (order by b) as rn from t; + +statement ok +drop table t; From 574dfeb29ffe80c0f223fe7551b9ab23f7f29eaa Mon Sep 17 00:00:00 2001 From: Namgung Chan <33323415+getChan@users.noreply.github.com> Date: Mon, 19 Aug 2024 09:48:25 +0900 Subject: [PATCH 4/4] Fix: generate_series function support string type (#12002) * fix: sqllogictest * Revert "fix: sqllogictest" This reverts commit 4957a1d24a5102408ac3897ff93b2f2e5ad477ff. * fix: sqllogictest * remove any type signature * coerce type from null to date32 * fmt * slt * Revert "coerce type from null to date32" This reverts commit bccdc2e56b415066f1cdeb4ab671894c4562b1fe. * replace type coerce by `coerce_types` method * fmt * fix underscored param --- datafusion/functions-nested/src/range.rs | 93 +++++++++++++------- datafusion/sqllogictest/test_files/array.slt | 9 +- 2 files changed, 67 insertions(+), 35 deletions(-) diff --git a/datafusion/functions-nested/src/range.rs b/datafusion/functions-nested/src/range.rs index 5b7315719631..90cf8bcbd057 100644 --- a/datafusion/functions-nested/src/range.rs +++ b/datafusion/functions-nested/src/range.rs @@ -23,13 +23,12 @@ use arrow::datatypes::{DataType, Field}; use arrow_array::types::{Date32Type, IntervalMonthDayNanoType}; use arrow_array::NullArray; use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer}; -use arrow_schema::DataType::{Date32, Int64, Interval, List}; +use arrow_schema::DataType::*; use arrow_schema::IntervalUnit::MonthDayNano; use datafusion_common::cast::{as_date32_array, as_int64_array, as_interval_mdn_array}; use datafusion_common::{exec_err, not_impl_datafusion_err, Result}; -use datafusion_expr::{ - ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, Volatility, -}; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; +use itertools::Itertools; use std::any::Any; use std::iter::from_fn; use std::sync::Arc; @@ -49,16 +48,7 @@ pub(super) struct Range { impl Range { pub fn new() -> Self { Self { - signature: Signature::one_of( - vec![ - TypeSignature::Exact(vec![Int64]), - TypeSignature::Exact(vec![Int64, Int64]), - TypeSignature::Exact(vec![Int64, Int64, Int64]), - TypeSignature::Exact(vec![Date32, Date32, Interval(MonthDayNano)]), - TypeSignature::Any(3), - ], - Volatility::Immutable, - ), + signature: Signature::user_defined(Volatility::Immutable), aliases: vec![], } } @@ -75,9 +65,34 @@ impl ScalarUDFImpl for Range { &self.signature } + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + arg_types + .iter() + .map(|arg_type| match arg_type { + Null => Ok(Null), + Int8 => Ok(Int64), + Int16 => Ok(Int64), + Int32 => Ok(Int64), + Int64 => Ok(Int64), + UInt8 => Ok(Int64), + UInt16 => Ok(Int64), + UInt32 => Ok(Int64), + UInt64 => Ok(Int64), + Timestamp(_, _) => Ok(Date32), + Date32 => Ok(Date32), + Date64 => Ok(Date32), + Utf8 => Ok(Date32), + LargeUtf8 => Ok(Date32), + Utf8View => Ok(Date32), + Interval(_) => Ok(Interval(MonthDayNano)), + _ => exec_err!("Unsupported DataType"), + }) + .try_collect() + } + fn return_type(&self, arg_types: &[DataType]) -> Result { - if arg_types.iter().any(|t| t.eq(&DataType::Null)) { - Ok(DataType::Null) + if arg_types.iter().any(|t| t.is_null()) { + Ok(Null) } else { Ok(List(Arc::new(Field::new( "item", @@ -88,7 +103,7 @@ impl ScalarUDFImpl for Range { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - if args.iter().any(|arg| arg.data_type() == DataType::Null) { + if args.iter().any(|arg| arg.data_type().is_null()) { return Ok(ColumnarValue::Array(Arc::new(NullArray::new(1)))); } match args[0].data_type() { @@ -120,16 +135,7 @@ pub(super) struct GenSeries { impl GenSeries { pub fn new() -> Self { Self { - signature: Signature::one_of( - vec![ - TypeSignature::Exact(vec![Int64]), - TypeSignature::Exact(vec![Int64, Int64]), - TypeSignature::Exact(vec![Int64, Int64, Int64]), - TypeSignature::Exact(vec![Date32, Date32, Interval(MonthDayNano)]), - TypeSignature::Any(3), - ], - Volatility::Immutable, - ), + signature: Signature::user_defined(Volatility::Immutable), aliases: vec![], } } @@ -146,9 +152,34 @@ impl ScalarUDFImpl for GenSeries { &self.signature } + fn coerce_types(&self, _arg_types: &[DataType]) -> Result> { + _arg_types + .iter() + .map(|arg_type| match arg_type { + Null => Ok(Null), + Int8 => Ok(Int64), + Int16 => Ok(Int64), + Int32 => Ok(Int64), + Int64 => Ok(Int64), + UInt8 => Ok(Int64), + UInt16 => Ok(Int64), + UInt32 => Ok(Int64), + UInt64 => Ok(Int64), + Timestamp(_, _) => Ok(Date32), + Date32 => Ok(Date32), + Date64 => Ok(Date32), + Utf8 => Ok(Date32), + LargeUtf8 => Ok(Date32), + Utf8View => Ok(Date32), + Interval(_) => Ok(Interval(MonthDayNano)), + _ => exec_err!("Unsupported DataType"), + }) + .try_collect() + } + fn return_type(&self, arg_types: &[DataType]) -> Result { - if arg_types.iter().any(|t| t.eq(&DataType::Null)) { - Ok(DataType::Null) + if arg_types.iter().any(|t| t.is_null()) { + Ok(Null) } else { Ok(List(Arc::new(Field::new( "item", @@ -159,7 +190,7 @@ impl ScalarUDFImpl for GenSeries { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - if args.iter().any(|arg| arg.data_type() == DataType::Null) { + if args.iter().any(|arg| arg.data_type().is_null()) { return Ok(ColumnarValue::Array(Arc::new(NullArray::new(1)))); } match args[0].data_type() { @@ -167,7 +198,7 @@ impl ScalarUDFImpl for GenSeries { Date32 => make_scalar_function(|args| gen_range_date(args, true))(args), dt => { exec_err!( - "unsupported type for range. Expected Int64 or Date32, got: {}", + "unsupported type for gen_series. Expected Int64 or Date32, got: {}", dt ) } diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index b97ecced57e3..249241a51aea 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -5804,7 +5804,7 @@ select generate_series(5), ---- [0, 1, 2, 3, 4, 5] [2, 3, 4, 5] [2, 5, 8] [1, 2, 3, 4, 5] [5, 4, 3, 2, 1] [10, 7, 4] [1992-09-01, 1992-10-01, 1992-11-01, 1992-12-01, 1993-01-01, 1993-02-01, 1993-03-01] [1993-02-01, 1993-01-31, 1993-01-30, 1993-01-29, 1993-01-28, 1993-01-27, 1993-01-26, 1993-01-25, 1993-01-24, 1993-01-23, 1993-01-22, 1993-01-21, 1993-01-20, 1993-01-19, 1993-01-18, 1993-01-17, 1993-01-16, 1993-01-15, 1993-01-14, 1993-01-13, 1993-01-12, 1993-01-11, 1993-01-10, 1993-01-09, 1993-01-08, 1993-01-07, 1993-01-06, 1993-01-05, 1993-01-04, 1993-01-03, 1993-01-02, 1993-01-01] [1989-04-01, 1990-04-01, 1991-04-01, 1992-04-01] -query error DataFusion error: Execution error: unsupported type for range. Expected Int64 or Date32, got: Timestamp\(Nanosecond, None\) +query error DataFusion error: Execution error: Cannot generate date range less than 1 day\. select generate_series('2021-01-01'::timestamp, '2021-01-02'::timestamp, INTERVAL '1' HOUR); ## should return NULL @@ -5936,11 +5936,12 @@ select generate_series(start, '1993-03-01'::date, INTERVAL '1 year') from date_t # https://github.com/apache/datafusion/issues/11922 -query error +query ? select generate_series(start, '1993-03-01', INTERVAL '1 year') from date_table; ---- -DataFusion error: Internal error: could not cast value to arrow_array::array::primitive_array::PrimitiveArray. -This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker +[1992-01-01, 1993-01-01] +[1993-02-01] +[1989-04-01, 1990-04-01, 1991-04-01, 1992-04-01] ## array_except