Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build: Switch back to official DataFusion repo and arrow-rs after Arrow Java 16 is released #403

Merged
merged 12 commits into from
Jun 7, 2024
240 changes: 159 additions & 81 deletions core/Cargo.lock

Large diffs are not rendered by default.

25 changes: 14 additions & 11 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ include = [

[dependencies]
parquet-format = "4.0.0" # This must be kept in sync with that from parquet crate
arrow = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow-array = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c" }
arrow-data = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c" }
arrow-schema = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c" }
arrow-string = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c" }
parquet = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c", default-features = false, features = ["experimental"] }
half = { version = "~2.1", default-features = false }
arrow = { version = "52.0.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow-array = { version = "52.0.0" }
arrow-buffer = { version = "52.0.0" }
arrow-data = { version = "52.0.0" }
arrow-schema = { version = "52.0.0" }
arrow-string = { version = "52.0.0" }
parquet = { version = "52.0.0", default-features = false, features = ["experimental"] }
half = { version = "2.4.1", default-features = false }
futures = "0.3.28"
mimalloc = { version = "*", default-features = false, optional = true }
tokio = { version = "1", features = ["rt-multi-thread"] }
Expand Down Expand Up @@ -66,10 +67,12 @@ itertools = "0.11.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.8" }
paste = "1.0.14"
datafusion-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4" }
datafusion = { default-features = false, git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4", features = ["unicode_expressions", "crypto_expressions"] }
datafusion-functions = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4", features = ["crypto_expressions"]}
datafusion-physical-expr = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4", default-features = false, features = ["unicode_expressions"] }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1" }
datafusion = { default-features = false, git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", features = ["unicode_expressions", "crypto_expressions"] }
datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", features = ["crypto_expressions"] }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", default-features = false }
datafusion-physical-expr-common = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", default-features = false }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", default-features = false }
unicode-segmentation = "^1.10.1"
once_cell = "1.18.0"
regex = "1.9.6"
Expand Down
4 changes: 2 additions & 2 deletions core/src/execution/datafusion/expressions/bitwise_not.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ impl PhysicalExpr for BitwiseNotExpr {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.arg.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.arg]
}

fn with_new_children(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ impl PhysicalExpr for BloomFilterMightContain {
})
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.bloom_filter_expr.clone(), self.value_expr.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.bloom_filter_expr, &self.value_expr]
}

fn with_new_children(
Expand Down
4 changes: 2 additions & 2 deletions core/src/execution/datafusion/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1291,8 +1291,8 @@ impl PhysicalExpr for Cast {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down
4 changes: 2 additions & 2 deletions core/src/execution/datafusion/expressions/checkoverflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ impl PhysicalExpr for CheckOverflow {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down
12 changes: 4 additions & 8 deletions core/src/execution/datafusion/expressions/if_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,8 @@ impl PhysicalExpr for IfExpr {
Ok(ColumnarValue::Array(current_value))
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![
self.if_expr.clone(),
self.true_expr.clone(),
self.false_expr.clone(),
]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.if_expr, &self.true_expr, &self.false_expr]
}

fn with_new_children(
Expand Down Expand Up @@ -225,8 +221,8 @@ mod tests {
let true_expr = lit(123i32);
let false_expr = lit(999i32);

let expr = if_fn(if_expr, true_expr, false_expr);
let children = expr.unwrap().children();
let expr = if_fn(if_expr, true_expr, false_expr).unwrap();
let children = expr.children();
assert_eq!(children.len(), 3);
assert_eq!(children[0].to_string(), "true");
assert_eq!(children[1].to_string(), "123");
Expand Down
19 changes: 10 additions & 9 deletions core/src/execution/datafusion/expressions/negative.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
use crate::errors::CometError;
use arrow::{compute::kernels::numeric::neg_wrapping, datatypes::IntervalDayTimeType};
use arrow_array::RecordBatch;
use arrow_buffer::IntervalDayTime;
use arrow_schema::{DataType, Schema};
use datafusion::{
logical_expr::{interval_arithmetic::Interval, ColumnarValue},
physical_expr::PhysicalExpr,
};
use datafusion_common::{Result, ScalarValue};
use datafusion_physical_expr::{
aggregate::utils::down_cast_any_ref, sort_properties::SortProperties,
};
use datafusion_expr::sort_properties::ExprProperties;
use datafusion_physical_expr::aggregate::utils::down_cast_any_ref;
use std::{
any::Any,
hash::{Hash, Hasher},
Expand Down Expand Up @@ -63,7 +63,7 @@ macro_rules! check_overflow {
for i in 0..typed_array.len() {
if typed_array.value(i) == $min_val {
if $type_name == "byte" || $type_name == "short" {
let value = typed_array.value(i).to_string() + " caused";
let value = format!("{:?} caused", typed_array.value(i));
return Err(arithmetic_overflow_error(value.as_str()).into());
}
return Err(arithmetic_overflow_error($type_name).into());
Expand Down Expand Up @@ -135,7 +135,7 @@ impl PhysicalExpr for NegativeExpr {
arrow::datatypes::IntervalUnit::DayTime => check_overflow!(
array,
arrow::array::IntervalDayTimeArray,
i64::MIN,
IntervalDayTime::MIN,
"interval"
),
arrow::datatypes::IntervalUnit::MonthDayNano => {
Expand Down Expand Up @@ -195,8 +195,8 @@ impl PhysicalExpr for NegativeExpr {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.arg.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.arg]
}

fn with_new_children(
Expand Down Expand Up @@ -255,8 +255,9 @@ impl PhysicalExpr for NegativeExpr {
}

/// The ordering of a [`NegativeExpr`] is simply the reverse of its child.
fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
-children[0]
fn get_properties(&self, children: &[ExprProperties]) -> Result<ExprProperties> {
let properties = children[0].clone().with_order(children[0].sort_properties);
Ok(properties)
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/execution/datafusion/expressions/normalize_nan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl PhysicalExpr for NormalizeNaNAndZero {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
self.child.children()
}

Expand Down
37 changes: 13 additions & 24 deletions core/src/execution/datafusion/expressions/scalar_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::{
any::Any,
cmp::min,
fmt::{Debug, Write},
str::FromStr,
sync::Arc,
};

Expand All @@ -35,17 +34,15 @@ use arrow_array::{Array, ArrowNativeTypeOp, Decimal128Array, StringArray};
use arrow_schema::DataType;
use datafusion::{
execution::FunctionRegistry,
logical_expr::{
BuiltinScalarFunction, ScalarFunctionDefinition, ScalarFunctionImplementation,
ScalarUDFImpl, Signature, Volatility,
},
functions::math::round::round,
logical_expr::{ScalarFunctionImplementation, ScalarUDFImpl, Signature, Volatility},
physical_plan::ColumnarValue,
};
use datafusion_common::{
cast::{as_binary_array, as_generic_string_array},
exec_err, internal_err, DataFusionError, Result as DataFusionResult, ScalarValue,
};
use datafusion_physical_expr::{math_expressions, udf::ScalarUDF};
use datafusion_expr::ScalarUDF;
use num::{
integer::{div_ceil, div_floor},
BigInt, Signed, ToPrimitive,
Expand All @@ -66,9 +63,7 @@ macro_rules! make_comet_scalar_udf {
$data_type.clone(),
Arc::new(move |args| $func(args, &$data_type)),
);
Ok(ScalarFunctionDefinition::UDF(Arc::new(
ScalarUDF::new_from_impl(scalar_func),
)))
Ok(Arc::new(ScalarUDF::new_from_impl(scalar_func)))
}};
($name:expr, $func:expr, without $data_type:ident) => {{
let scalar_func = CometScalarFunction::new(
Expand All @@ -77,9 +72,7 @@ macro_rules! make_comet_scalar_udf {
$data_type,
$func,
);
Ok(ScalarFunctionDefinition::UDF(Arc::new(
ScalarUDF::new_from_impl(scalar_func),
)))
Ok(Arc::new(ScalarUDF::new_from_impl(scalar_func)))
}};
}

Expand All @@ -88,7 +81,7 @@ pub fn create_comet_physical_fun(
fun_name: &str,
data_type: DataType,
registry: &dyn FunctionRegistry,
) -> Result<ScalarFunctionDefinition, DataFusionError> {
) -> Result<Arc<ScalarUDF>, DataFusionError> {
let sha2_functions = ["sha224", "sha256", "sha384", "sha512"];
match fun_name {
"ceil" => {
Expand Down Expand Up @@ -140,13 +133,11 @@ pub fn create_comet_physical_fun(
let spark_func_name = "spark".to_owned() + sha;
make_comet_scalar_udf!(spark_func_name, wrapped_func, without data_type)
}
_ => {
if let Ok(fun) = BuiltinScalarFunction::from_str(fun_name) {
Ok(ScalarFunctionDefinition::BuiltIn(fun))
} else {
Ok(ScalarFunctionDefinition::UDF(registry.udf(fun_name)?))
}
}
_ => registry.udf(fun_name).map_err(|e| {
DataFusionError::Execution(format!(
"Function {fun_name} not found in the registry: {e}",
))
}),
}
}

Expand Down Expand Up @@ -509,9 +500,7 @@ fn spark_round(
make_decimal_array(array, precision, scale, &f)
}
DataType::Float32 | DataType::Float64 => {
Ok(ColumnarValue::Array(math_expressions::round(&[
array.clone()
])?))
Ok(ColumnarValue::Array(round(&[array.clone()])?))
}
dt => exec_err!("Not supported datatype for ROUND: {dt}"),
},
Expand All @@ -534,7 +523,7 @@ fn spark_round(
make_decimal_scalar(a, precision, scale, &f)
}
ScalarValue::Float32(_) | ScalarValue::Float64(_) => Ok(ColumnarValue::Scalar(
ScalarValue::try_from_array(&math_expressions::round(&[a.to_array()?])?, 0)?,
ScalarValue::try_from_array(&round(&[a.to_array()?])?, 0)?,
)),
dt => exec_err!("Not supported datatype for ROUND: {dt}"),
},
Expand Down
12 changes: 6 additions & 6 deletions core/src/execution/datafusion/expressions/strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ macro_rules! make_predicate_function {
Ok(ColumnarValue::Array(Arc::new(array)))
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.left.clone(), self.right.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.left, &self.right]
}

fn with_new_children(
Expand Down Expand Up @@ -221,8 +221,8 @@ impl PhysicalExpr for SubstringExec {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down Expand Up @@ -286,8 +286,8 @@ impl PhysicalExpr for StringSpaceExec {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down
2 changes: 1 addition & 1 deletion core/src/execution/datafusion/expressions/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl PhysicalExpr for Subquery {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![]
}

Expand Down
20 changes: 10 additions & 10 deletions core/src/execution/datafusion/expressions/temporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ impl PhysicalExpr for HourExec {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down Expand Up @@ -205,8 +205,8 @@ impl PhysicalExpr for MinuteExec {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down Expand Up @@ -299,8 +299,8 @@ impl PhysicalExpr for SecondExec {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down Expand Up @@ -386,8 +386,8 @@ impl PhysicalExpr for DateTruncExec {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down Expand Up @@ -511,8 +511,8 @@ impl PhysicalExpr for TimestampTruncExec {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down
2 changes: 1 addition & 1 deletion core/src/execution/datafusion/expressions/unbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl PhysicalExpr for UnboundColumn {
internal_err!("UnboundColumn::evaluate() should not be called")
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![]
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/execution/datafusion/operators/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ impl ExecutionPlan for CometExpandExec {
self.schema.clone()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.child]
}

fn with_new_children(
Expand Down
Loading
Loading