Skip to content

Commit

Permalink
Merge commit '574dfeb29ffe80c0f223fe7551b9ab23f7f29eaa' into update_a…
Browse files Browse the repository at this point in the history
…ugust_wk_3
  • Loading branch information
itsjunetime committed Aug 26, 2024
2 parents 076fa90 + 574dfeb commit b37c5d1
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 45 deletions.
2 changes: 1 addition & 1 deletion datafusion-cli/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

FROM rust:1.78-bookworm as builder
FROM rust:1.78-bookworm AS builder

COPY . /usr/src/datafusion
COPY ./datafusion /usr/src/datafusion/datafusion
Expand Down
17 changes: 16 additions & 1 deletion datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::expr::{
};
use crate::type_coercion::binary::get_result_type;
use crate::type_coercion::functions::{
data_types_with_aggregate_udf, data_types_with_scalar_udf,
data_types_with_aggregate_udf, data_types_with_scalar_udf, data_types_with_window_udf,
};
use crate::{utils, LogicalPlan, Projection, Subquery, WindowFunctionDefinition};
use arrow::compute::can_cast_types;
Expand Down Expand Up @@ -191,6 +191,21 @@ impl ExprSchemable for Expr {
})?;
Ok(fun.return_type(&new_types, &nullability)?)
}
WindowFunctionDefinition::WindowUDF(udwf) => {
let new_types = data_types_with_window_udf(&data_types, udwf)
.map_err(|err| {
plan_datafusion_err!(
"{} {}",
err,
utils::generate_signature_error_msg(
fun.name(),
fun.signature().clone(),
&data_types
)
)
})?;
Ok(fun.return_type(&new_types, &nullability)?)
}
_ => fun.return_type(&data_types, &nullability),
}
}
Expand Down
5 changes: 5 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ impl LogicalPlanBuilder {
self.plan.schema()
}

/// Return the LogicalPlan of the plan build so far
pub fn plan(&self) -> &LogicalPlan {
&self.plan
}

/// Create an empty relation.
///
/// `produce_one_row` set to true means this empty node needs to produce a placeholder row.
Expand Down
74 changes: 67 additions & 7 deletions datafusion/expr/src/type_coercion/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,21 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use crate::{AggregateUDF, ScalarUDF, Signature, TypeSignature};
use super::binary::{binary_numeric_coercion, comparison_coercion};
use crate::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF};
use arrow::{
compute::can_cast_types,
datatypes::{DataType, TimeUnit},
};
use datafusion_common::utils::{coerced_fixed_size_list_to_list, list_ndims};
use datafusion_common::{
exec_err, internal_datafusion_err, internal_err, plan_err, Result,
exec_err, internal_datafusion_err, internal_err, plan_err,
utils::{coerced_fixed_size_list_to_list, list_ndims},
Result,
};
use datafusion_expr_common::signature::{
ArrayFunctionSignature, FIXED_SIZE_LIST_WILDCARD, TIMEZONE_WILDCARD,
};

use super::binary::{binary_numeric_coercion, comparison_coercion};
use std::sync::Arc;

/// Performs type coercion for scalar function arguments.
///
Expand Down Expand Up @@ -66,6 +65,13 @@ pub fn data_types_with_scalar_udf(
try_coerce_types(valid_types, current_types, &signature.type_signature)
}

/// Performs type coercion for aggregate function arguments.
///
/// Returns the data types to which each argument must be coerced to
/// match `signature`.
///
/// For more details on coercion in general, please see the
/// [`type_coercion`](crate::type_coercion) module.
pub fn data_types_with_aggregate_udf(
current_types: &[DataType],
func: &AggregateUDF,
Expand Down Expand Up @@ -95,6 +101,39 @@ pub fn data_types_with_aggregate_udf(
try_coerce_types(valid_types, current_types, &signature.type_signature)
}

/// Performs type coercion for window function arguments.
///
/// Returns the data types to which each argument must be coerced to
/// match `signature`.
///
/// For more details on coercion in general, please see the
/// [`type_coercion`](crate::type_coercion) module.
pub fn data_types_with_window_udf(
current_types: &[DataType],
func: &WindowUDF,
) -> Result<Vec<DataType>> {
let signature = func.signature();

if current_types.is_empty() {
if signature.type_signature.supports_zero_argument() {
return Ok(vec![]);
} else {
return plan_err!("{} does not support zero arguments.", func.name());
}
}

let valid_types =
get_valid_types_with_window_udf(&signature.type_signature, current_types, func)?;
if valid_types
.iter()
.any(|data_type| data_type == current_types)
{
return Ok(current_types.to_vec());
}

try_coerce_types(valid_types, current_types, &signature.type_signature)
}

/// Performs type coercion for function arguments.
///
/// Returns the data types to which each argument must be coerced to
Expand Down Expand Up @@ -205,6 +244,27 @@ fn get_valid_types_with_aggregate_udf(
Ok(valid_types)
}

fn get_valid_types_with_window_udf(
signature: &TypeSignature,
current_types: &[DataType],
func: &WindowUDF,
) -> Result<Vec<Vec<DataType>>> {
let valid_types = match signature {
TypeSignature::UserDefined => match func.coerce_types(current_types) {
Ok(coerced_types) => vec![coerced_types],
Err(e) => return exec_err!("User-defined coercion failed with {:?}", e),
},
TypeSignature::OneOf(signatures) => signatures
.iter()
.filter_map(|t| get_valid_types_with_window_udf(t, current_types, func).ok())
.flatten()
.collect::<Vec<_>>(),
_ => get_valid_types(signature, current_types)?,
};

Ok(valid_types)
}

/// Returns a Vec of all possible valid argument types for the given signature.
fn get_valid_types(
signature: &TypeSignature,
Expand Down
30 changes: 29 additions & 1 deletion datafusion/expr/src/udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::{

use arrow::datatypes::DataType;

use datafusion_common::Result;
use datafusion_common::{not_impl_err, Result};

use crate::expr::WindowFunction;
use crate::{
Expand Down Expand Up @@ -192,6 +192,11 @@ impl WindowUDF {
pub fn sort_options(&self) -> Option<SortOptions> {
self.inner.sort_options()
}

/// See [`WindowUDFImpl::coerce_types`] for more details.
pub fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
self.inner.coerce_types(arg_types)
}
}

impl<F> From<F> for WindowUDF
Expand Down Expand Up @@ -353,6 +358,29 @@ pub trait WindowUDFImpl: Debug + Send + Sync {
fn sort_options(&self) -> Option<SortOptions> {
None
}

/// Coerce arguments of a function call to types that the function can evaluate.
///
/// This function is only called if [`WindowUDFImpl::signature`] returns [`crate::TypeSignature::UserDefined`]. Most
/// UDWFs should return one of the other variants of `TypeSignature` which handle common
/// cases
///
/// See the [type coercion module](crate::type_coercion)
/// documentation for more details on type coercion
///
/// For example, if your function requires a floating point arguments, but the user calls
/// it like `my_func(1::int)` (aka with `1` as an integer), coerce_types could return `[DataType::Float64]`
/// to ensure the argument was cast to `1::double`
///
/// # Parameters
/// * `arg_types`: The argument types of the arguments this function with
///
/// # Return value
/// A Vec the same length as `arg_types`. DataFusion will `CAST` the function call
/// arguments to these specific types.
fn coerce_types(&self, _arg_types: &[DataType]) -> Result<Vec<DataType>> {
not_impl_err!("Function {} does not implement coerce_types", self.name())
}
}

/// WindowUDF that adds an alias to the underlying function. It is better to
Expand Down
93 changes: 62 additions & 31 deletions datafusion/functions-nested/src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ use arrow::datatypes::{DataType, Field};
use arrow_array::types::{Date32Type, IntervalMonthDayNanoType};
use arrow_array::NullArray;
use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer};
use arrow_schema::DataType::{Date32, Int64, Interval, List};
use arrow_schema::DataType::*;
use arrow_schema::IntervalUnit::MonthDayNano;
use datafusion_common::cast::{as_date32_array, as_int64_array, as_interval_mdn_array};
use datafusion_common::{exec_err, not_impl_datafusion_err, Result};
use datafusion_expr::{
ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, Volatility,
};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use itertools::Itertools;
use std::any::Any;
use std::iter::from_fn;
use std::sync::Arc;
Expand All @@ -49,16 +48,7 @@ pub(super) struct Range {
impl Range {
pub fn new() -> Self {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Exact(vec![Int64]),
TypeSignature::Exact(vec![Int64, Int64]),
TypeSignature::Exact(vec![Int64, Int64, Int64]),
TypeSignature::Exact(vec![Date32, Date32, Interval(MonthDayNano)]),
TypeSignature::Any(3),
],
Volatility::Immutable,
),
signature: Signature::user_defined(Volatility::Immutable),
aliases: vec![],
}
}
Expand All @@ -75,9 +65,34 @@ impl ScalarUDFImpl for Range {
&self.signature
}

fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
arg_types
.iter()
.map(|arg_type| match arg_type {
Null => Ok(Null),
Int8 => Ok(Int64),
Int16 => Ok(Int64),
Int32 => Ok(Int64),
Int64 => Ok(Int64),
UInt8 => Ok(Int64),
UInt16 => Ok(Int64),
UInt32 => Ok(Int64),
UInt64 => Ok(Int64),
Timestamp(_, _) => Ok(Date32),
Date32 => Ok(Date32),
Date64 => Ok(Date32),
Utf8 => Ok(Date32),
LargeUtf8 => Ok(Date32),
Utf8View => Ok(Date32),
Interval(_) => Ok(Interval(MonthDayNano)),
_ => exec_err!("Unsupported DataType"),
})
.try_collect()
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
if arg_types.iter().any(|t| t.eq(&DataType::Null)) {
Ok(DataType::Null)
if arg_types.iter().any(|t| t.is_null()) {
Ok(Null)
} else {
Ok(List(Arc::new(Field::new(
"item",
Expand All @@ -88,7 +103,7 @@ impl ScalarUDFImpl for Range {
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.iter().any(|arg| arg.data_type() == DataType::Null) {
if args.iter().any(|arg| arg.data_type().is_null()) {
return Ok(ColumnarValue::Array(Arc::new(NullArray::new(1))));
}
match args[0].data_type() {
Expand Down Expand Up @@ -120,16 +135,7 @@ pub(super) struct GenSeries {
impl GenSeries {
pub fn new() -> Self {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Exact(vec![Int64]),
TypeSignature::Exact(vec![Int64, Int64]),
TypeSignature::Exact(vec![Int64, Int64, Int64]),
TypeSignature::Exact(vec![Date32, Date32, Interval(MonthDayNano)]),
TypeSignature::Any(3),
],
Volatility::Immutable,
),
signature: Signature::user_defined(Volatility::Immutable),
aliases: vec![],
}
}
Expand All @@ -146,9 +152,34 @@ impl ScalarUDFImpl for GenSeries {
&self.signature
}

fn coerce_types(&self, _arg_types: &[DataType]) -> Result<Vec<DataType>> {
_arg_types
.iter()
.map(|arg_type| match arg_type {
Null => Ok(Null),
Int8 => Ok(Int64),
Int16 => Ok(Int64),
Int32 => Ok(Int64),
Int64 => Ok(Int64),
UInt8 => Ok(Int64),
UInt16 => Ok(Int64),
UInt32 => Ok(Int64),
UInt64 => Ok(Int64),
Timestamp(_, _) => Ok(Date32),
Date32 => Ok(Date32),
Date64 => Ok(Date32),
Utf8 => Ok(Date32),
LargeUtf8 => Ok(Date32),
Utf8View => Ok(Date32),
Interval(_) => Ok(Interval(MonthDayNano)),
_ => exec_err!("Unsupported DataType"),
})
.try_collect()
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
if arg_types.iter().any(|t| t.eq(&DataType::Null)) {
Ok(DataType::Null)
if arg_types.iter().any(|t| t.is_null()) {
Ok(Null)
} else {
Ok(List(Arc::new(Field::new(
"item",
Expand All @@ -159,15 +190,15 @@ impl ScalarUDFImpl for GenSeries {
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.iter().any(|arg| arg.data_type() == DataType::Null) {
if args.iter().any(|arg| arg.data_type().is_null()) {
return Ok(ColumnarValue::Array(Arc::new(NullArray::new(1))));
}
match args[0].data_type() {
Int64 => make_scalar_function(|args| gen_range_inner(args, true))(args),
Date32 => make_scalar_function(|args| gen_range_date(args, true))(args),
dt => {
exec_err!(
"unsupported type for range. Expected Int64 or Date32, got: {}",
"unsupported type for gen_series. Expected Int64 or Date32, got: {}",
dt
)
}
Expand Down
9 changes: 5 additions & 4 deletions datafusion/sqllogictest/test_files/array.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5804,7 +5804,7 @@ select generate_series(5),
----
[0, 1, 2, 3, 4, 5] [2, 3, 4, 5] [2, 5, 8] [1, 2, 3, 4, 5] [5, 4, 3, 2, 1] [10, 7, 4] [1992-09-01, 1992-10-01, 1992-11-01, 1992-12-01, 1993-01-01, 1993-02-01, 1993-03-01] [1993-02-01, 1993-01-31, 1993-01-30, 1993-01-29, 1993-01-28, 1993-01-27, 1993-01-26, 1993-01-25, 1993-01-24, 1993-01-23, 1993-01-22, 1993-01-21, 1993-01-20, 1993-01-19, 1993-01-18, 1993-01-17, 1993-01-16, 1993-01-15, 1993-01-14, 1993-01-13, 1993-01-12, 1993-01-11, 1993-01-10, 1993-01-09, 1993-01-08, 1993-01-07, 1993-01-06, 1993-01-05, 1993-01-04, 1993-01-03, 1993-01-02, 1993-01-01] [1989-04-01, 1990-04-01, 1991-04-01, 1992-04-01]

query error DataFusion error: Execution error: unsupported type for range. Expected Int64 or Date32, got: Timestamp\(Nanosecond, None\)
query error DataFusion error: Execution error: Cannot generate date range less than 1 day\.
select generate_series('2021-01-01'::timestamp, '2021-01-02'::timestamp, INTERVAL '1' HOUR);

## should return NULL
Expand Down Expand Up @@ -5936,11 +5936,12 @@ select generate_series(start, '1993-03-01'::date, INTERVAL '1 year') from date_t


# https://github.com/apache/datafusion/issues/11922
query error
query ?
select generate_series(start, '1993-03-01', INTERVAL '1 year') from date_table;
----
DataFusion error: Internal error: could not cast value to arrow_array::array::primitive_array::PrimitiveArray<arrow_array::types::Date32Type>.
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
[1992-01-01, 1993-01-01]
[1993-02-01]
[1989-04-01, 1990-04-01, 1991-04-01, 1992-04-01]


## array_except
Expand Down
Loading

0 comments on commit b37c5d1

Please sign in to comment.