Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace the arithmetic op for decimal array op decimal array using arrow kernel #4648

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions datafusion/core/tests/sql/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,20 +582,20 @@ async fn decimal_arithmetic_op() -> Result<()> {
"+---------------------------------------+",
"| decimal_simple.c1 / decimal_simple.c5 |",
"+---------------------------------------+",
"| 0.7142857142857143296 |",
"| 0.7142857142857142857 |",
"| 0.8000000000000000000 |",
"| 1.0526315789473683456 |",
"| 1.0526315789473684210 |",
"| 0.9375000000000000000 |",
"| 0.8571428571428571136 |",
"| 2.7272727272727269376 |",
"| 0.9090909090909090816 |",
"| 0.8571428571428571428 |",
"| 2.7272727272727272727 |",
"| 0.9090909090909090909 |",
"| 1.0000000000000000000 |",
"| 1.0000000000000000000 |",
"| 0.9090909090909090816 |",
"| 0.9615384615384614912 |",
"| 0.6410256410256410624 |",
"| 1.5151515151515152384 |",
"| 0.7352941176470588416 |",
"| 0.9090909090909090909 |",
"| 0.9615384615384615384 |",
"| 0.6410256410256410256 |",
"| 1.5151515151515151515 |",
"| 0.7352941176470588235 |",
"| 0.5000000000000000000 |",
"+---------------------------------------+",
];
Expand Down
42 changes: 41 additions & 1 deletion datafusion/physical-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,9 @@ mod tests {
use super::*;
use crate::expressions::try_cast;
use crate::expressions::{col, lit};
use arrow::datatypes::{ArrowNumericType, Field, Int32Type, SchemaRef};
use arrow::datatypes::{
ArrowNumericType, Decimal128Type, Field, Int32Type, SchemaRef,
};
use datafusion_common::{ColumnStatistics, Result, Statistics};
use datafusion_expr::type_coercion::binary::coerce_types;

Expand Down Expand Up @@ -3048,6 +3050,43 @@ mod tests {
Ok(())
}

#[test]
fn arithmetic_divide_zero() -> Result<()> {
// other data type
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
]));
let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048, 100]));
let b = Arc::new(Int32Array::from(vec![2, 4, 8, 16, 32, 0]));

apply_arithmetic::<Int32Type>(
schema,
vec![a, b],
Operator::Divide,
Int32Array::from(vec![Some(4), Some(8), Some(16), Some(32), Some(64), None]),
)?;

// decimal
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Decimal128(25, 3), true),
Field::new("b", DataType::Decimal128(25, 3), true),
]));
let left_decimal_array =
Arc::new(create_decimal_array(&[Some(1234567), Some(1234567)], 25, 3));
let right_decimal_array =
Arc::new(create_decimal_array(&[Some(10), Some(0)], 25, 3));

apply_arithmetic::<Decimal128Type>(
schema,
vec![left_decimal_array, right_decimal_array],
Operator::Divide,
create_decimal_array(&[Some(123456700), None], 25, 3),
)?;

Ok(())
}

#[test]
fn bitwise_array_test() -> Result<()> {
let left = Arc::new(Int32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
Expand Down Expand Up @@ -3270,6 +3309,7 @@ mod tests {
}
Ok(())
}

#[test]
fn test_comparison_result_estimate_different_type() -> Result<()> {
// A table where the column 'a' has a min of 1.3, a max of 50.7.
Expand Down
122 changes: 30 additions & 92 deletions datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
//! This module contains computation kernels that are eventually
//! destined for arrow-rs but are in datafusion until they are ported.

use arrow::error::ArrowError;
use arrow::compute::{
add, add_scalar, divide_opt, divide_scalar, modulus, modulus_scalar, multiply,
multiply_scalar, subtract, subtract_scalar,
};
use arrow::{array::*, datatypes::ArrowNumericType};
use datafusion_common::{DataFusionError, Result};
use datafusion_common::Result;

// Simple (low performance) kernels until optimized kernels are added to arrow
// See https://github.com/apache/arrow-rs/issues/960
Expand Down Expand Up @@ -171,61 +174,20 @@ pub(crate) fn is_not_distinct_from_decimal(
.collect())
}

/// Creates an Decimal128Array the same size as `left`,
/// by applying `op` to all non-null elements of left and right
pub(crate) fn arith_decimal<F>(
left: &Decimal128Array,
right: &Decimal128Array,
op: F,
) -> Result<Decimal128Array>
where
F: Fn(i128, i128) -> Result<i128>,
{
left.iter()
.zip(right.iter())
.map(|(left, right)| {
if let (Some(left), Some(right)) = (left, right) {
Some(op(left, right)).transpose()
} else {
Ok(None)
}
})
.collect()
}

pub(crate) fn arith_decimal_scalar<F>(
left: &Decimal128Array,
right: i128,
op: F,
) -> Result<Decimal128Array>
where
F: Fn(i128, i128) -> Result<i128>,
{
left.iter()
.map(|left| {
if let Some(left) = left {
Some(op(left, right)).transpose()
} else {
Ok(None)
}
})
.collect()
}

pub(crate) fn add_decimal(
left: &Decimal128Array,
right: &Decimal128Array,
) -> Result<Decimal128Array> {
let array = arith_decimal(left, right, |left, right| Ok(left + right))?
.with_precision_and_scale(left.precision(), left.scale())?;
let array =
add(left, right)?.with_precision_and_scale(left.precision(), left.scale())?;
Ok(array)
}

pub(crate) fn add_decimal_scalar(
left: &Decimal128Array,
right: i128,
) -> Result<Decimal128Array> {
let array = arith_decimal_scalar(left, right, |left, right| Ok(left + right))?
let array = add_scalar(left, right)?
.with_precision_and_scale(left.precision(), left.scale())?;
Ok(array)
}
Expand All @@ -234,7 +196,7 @@ pub(crate) fn subtract_decimal(
left: &Decimal128Array,
right: &Decimal128Array,
) -> Result<Decimal128Array> {
let array = arith_decimal(left, right, |left, right| Ok(left - right))?
let array = subtract(left, right)?
.with_precision_and_scale(left.precision(), left.scale())?;
Ok(array)
}
Expand All @@ -243,7 +205,7 @@ pub(crate) fn subtract_decimal_scalar(
left: &Decimal128Array,
right: i128,
) -> Result<Decimal128Array> {
let array = arith_decimal_scalar(left, right, |left, right| Ok(left - right))?
let array = subtract_scalar(left, right)?
.with_precision_and_scale(left.precision(), left.scale())?;
Ok(array)
}
Expand All @@ -253,7 +215,8 @@ pub(crate) fn multiply_decimal(
right: &Decimal128Array,
) -> Result<Decimal128Array> {
let divide = 10_i128.pow(left.scale() as u32);
let array = arith_decimal(left, right, |left, right| Ok(left * right / divide))?
let array = multiply(left, right)?;
let array = divide_scalar(&array, divide)?
.with_precision_and_scale(left.precision(), left.scale())?;
Ok(array)
}
Expand All @@ -262,72 +225,51 @@ pub(crate) fn multiply_decimal_scalar(
left: &Decimal128Array,
right: i128,
) -> Result<Decimal128Array> {
let array = multiply_scalar(left, right)?;
let divide = 10_i128.pow(left.scale() as u32);
let array =
arith_decimal_scalar(left, right, |left, right| Ok(left * right / divide))?
.with_precision_and_scale(left.precision(), left.scale())?;
let array = divide_scalar(&array, divide)?
.with_precision_and_scale(left.precision(), left.scale())?;
Ok(array)
}

pub(crate) fn divide_opt_decimal(
left: &Decimal128Array,
right: &Decimal128Array,
) -> Result<Decimal128Array> {
let mul = 10_f64.powi(left.scale() as i32);
let array = arith_decimal(left, right, |left, right| {
if right == 0 {
return Err(DataFusionError::ArrowError(ArrowError::DivideByZero));
}
let l_value = left as f64;
let r_value = right as f64;
let result = ((l_value / r_value) * mul) as i128;
Ok(result)
})?
.with_precision_and_scale(left.precision(), left.scale())?;
let mul = 10_i128.pow(left.scale() as u32);
let array = multiply_scalar(left, mul)?;
let array = divide_opt(&array, right)?
.with_precision_and_scale(left.precision(), left.scale())?;
Ok(array)
}

pub(crate) fn divide_decimal_scalar(
left: &Decimal128Array,
right: i128,
) -> Result<Decimal128Array> {
if right == 0 {
return Err(DataFusionError::ArrowError(ArrowError::DivideByZero));
}
let mul = 10_f64.powi(left.scale() as i32);
let array = arith_decimal_scalar(left, right, |left, right| {
let l_value = left as f64;
let r_value = right as f64;
let result = ((l_value / r_value) * mul) as i128;
Ok(result)
})?
.with_precision_and_scale(left.precision(), left.scale())?;
let mul = 10_i128.pow(left.scale() as u32);
let array = multiply_scalar(left, mul)?;
// `0` of right will be checked in `divide_scalar`
let array = divide_scalar(&array, right)?
.with_precision_and_scale(left.precision(), left.scale())?;
Ok(array)
}

pub(crate) fn modulus_decimal(
left: &Decimal128Array,
right: &Decimal128Array,
) -> Result<Decimal128Array> {
let array = arith_decimal(left, right, |left, right| {
if right == 0 {
Err(DataFusionError::ArrowError(ArrowError::DivideByZero))
} else {
Ok(left % right)
}
})?
.with_precision_and_scale(left.precision(), left.scale())?;
let array =
modulus(left, right)?.with_precision_and_scale(left.precision(), left.scale())?;
Ok(array)
}

pub(crate) fn modulus_decimal_scalar(
left: &Decimal128Array,
right: i128,
) -> Result<Decimal128Array> {
if right == 0 {
return Err(DataFusionError::ArrowError(ArrowError::DivideByZero));
}
let array = arith_decimal_scalar(left, right, |left, right| Ok(left % right))?
// `0` for right will be checked in `modulus_scalar`
let array = modulus_scalar(left, right)?
.with_precision_and_scale(left.precision(), left.scale())?;
Ok(array)
}
Expand Down Expand Up @@ -485,7 +427,6 @@ mod tests {
3,
);
assert_eq!(expect, result);
// modulus
let result = modulus_decimal(&left_decimal_array, &right_decimal_array)?;
let expect =
create_decimal_array(&[Some(7), None, Some(37), Some(16), None], 25, 3);
Expand All @@ -503,9 +444,6 @@ mod tests {
let left_decimal_array = create_decimal_array(&[Some(101)], 10, 1);
let right_decimal_array = create_decimal_array(&[Some(0)], 1, 1);

let err =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this test removed as it is now covered by the new arithmetic_divide_zero test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No,
The behavior of divide for decimal array with decimal array was changed.

In the previous implementation, we will check the 0 for dividend, but in the kernel divide_opt of arrow-rs, the result will be None if the right is zero

divide_opt_decimal(&left_decimal_array, &right_decimal_array).unwrap_err();
assert_eq!("Arrow error: Divide by zero error", err.to_string());
let err = divide_decimal_scalar(&left_decimal_array, 0).unwrap_err();
assert_eq!("Arrow error: Divide by zero error", err.to_string());
let err = modulus_decimal(&left_decimal_array, &right_decimal_array).unwrap_err();
Expand Down Expand Up @@ -558,7 +496,7 @@ mod tests {
Some(false),
Some(true),
Some(false),
Some(true)
Some(true),
]),
is_distinct_from(&left_int_array, &right_int_array)?
);
Expand All @@ -570,7 +508,7 @@ mod tests {
Some(true),
Some(false),
Some(true),
Some(false)
Some(false),
]),
is_not_distinct_from(&left_int_array, &right_int_array)?
);
Expand Down