diff --git a/datafusion/expr/src/aggregate_function.rs b/datafusion/expr/src/aggregate_function.rs index 441e8953dffc..a7fbf26febb1 100644 --- a/datafusion/expr/src/aggregate_function.rs +++ b/datafusion/expr/src/aggregate_function.rs @@ -47,12 +47,6 @@ pub enum AggregateFunction { Correlation, /// Grouping Grouping, - /// Bit And - BitAnd, - /// Bit Or - BitOr, - /// Bit Xor - BitXor, /// Bool And BoolAnd, /// Bool Or @@ -72,9 +66,6 @@ impl AggregateFunction { NthValue => "NTH_VALUE", Correlation => "CORR", Grouping => "GROUPING", - BitAnd => "BIT_AND", - BitOr => "BIT_OR", - BitXor => "BIT_XOR", BoolAnd => "BOOL_AND", BoolOr => "BOOL_OR", StringAgg => "STRING_AGG", @@ -94,9 +85,6 @@ impl FromStr for AggregateFunction { Ok(match name { // general "avg" => AggregateFunction::Avg, - "bit_and" => AggregateFunction::BitAnd, - "bit_or" => AggregateFunction::BitOr, - "bit_xor" => AggregateFunction::BitXor, "bool_and" => AggregateFunction::BoolAnd, "bool_or" => AggregateFunction::BoolOr, "max" => AggregateFunction::Max, @@ -144,9 +132,6 @@ impl AggregateFunction { // The coerced_data_types is same with input_types. Ok(coerced_data_types[0].clone()) } - AggregateFunction::BitAnd - | AggregateFunction::BitOr - | AggregateFunction::BitXor => Ok(coerced_data_types[0].clone()), AggregateFunction::BoolAnd | AggregateFunction::BoolOr => { Ok(DataType::Boolean) } @@ -199,11 +184,6 @@ impl AggregateFunction { .collect::>(); Signature::uniform(1, valid, Volatility::Immutable) } - AggregateFunction::BitAnd - | AggregateFunction::BitOr - | AggregateFunction::BitXor => { - Signature::uniform(1, INTEGERS.to_vec(), Volatility::Immutable) - } AggregateFunction::BoolAnd | AggregateFunction::BoolOr => { Signature::uniform(1, vec![DataType::Boolean], Volatility::Immutable) } diff --git a/datafusion/expr/src/type_coercion/aggregates.rs b/datafusion/expr/src/type_coercion/aggregates.rs index 98324ed6120b..a216c98899fe 100644 --- a/datafusion/expr/src/type_coercion/aggregates.rs +++ b/datafusion/expr/src/type_coercion/aggregates.rs @@ -121,20 +121,6 @@ pub fn coerce_types( }; Ok(vec![v]) } - AggregateFunction::BitAnd - | AggregateFunction::BitOr - | AggregateFunction::BitXor => { - // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc - // smallint, int, bigint, real, double precision, decimal, or interval. - if !is_bit_and_or_xor_support_arg_type(&input_types[0]) { - return plan_err!( - "The function {:?} does not support inputs of type {:?}.", - agg_fun, - input_types[0] - ); - } - Ok(input_types.to_vec()) - } AggregateFunction::BoolAnd | AggregateFunction::BoolOr => { // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc // smallint, int, bigint, real, double precision, decimal, or interval. @@ -350,10 +336,6 @@ pub fn avg_sum_type(arg_type: &DataType) -> Result { } } -pub fn is_bit_and_or_xor_support_arg_type(arg_type: &DataType) -> bool { - NUMERICS.contains(arg_type) -} - pub fn is_bool_and_or_support_arg_type(arg_type: &DataType) -> bool { matches!(arg_type, DataType::Boolean) } diff --git a/datafusion/functions-aggregate/src/bit_and_or_xor.rs b/datafusion/functions-aggregate/src/bit_and_or_xor.rs new file mode 100644 index 000000000000..19e24f547d8a --- /dev/null +++ b/datafusion/functions-aggregate/src/bit_and_or_xor.rs @@ -0,0 +1,458 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines `BitAnd`, `BitOr`, `BitXor` and `BitXor DISTINCT` aggregate accumulators + +use std::any::Any; +use std::collections::HashSet; +use std::fmt::{Display, Formatter}; + +use ahash::RandomState; +use arrow::array::{downcast_integer, Array, ArrayRef, AsArray}; +use arrow::datatypes::{ + ArrowNativeType, ArrowNumericType, DataType, Int16Type, Int32Type, Int64Type, + Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, +}; +use arrow_schema::Field; + +use datafusion_common::cast::as_list_array; +use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue}; +use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; +use datafusion_expr::type_coercion::aggregates::INTEGERS; +use datafusion_expr::utils::format_state_name; +use datafusion_expr::{ + Accumulator, AggregateUDFImpl, GroupsAccumulator, ReversedUDAF, Signature, Volatility, +}; + +use datafusion_physical_expr_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; +use std::ops::{BitAndAssign, BitOrAssign, BitXorAssign}; + +/// This macro helps create group accumulators based on bitwise operations typically used internally +/// and might not be necessary for users to call directly. +macro_rules! group_accumulator_helper { + ($t:ty, $dt:expr, $opr:expr) => { + match $opr { + BitwiseOperationType::And => Ok(Box::new( + PrimitiveGroupsAccumulator::<$t, _>::new($dt, |x, y| x.bitand_assign(y)) + .with_starting_value(!0), + )), + BitwiseOperationType::Or => Ok(Box::new( + PrimitiveGroupsAccumulator::<$t, _>::new($dt, |x, y| x.bitor_assign(y)), + )), + BitwiseOperationType::Xor => Ok(Box::new( + PrimitiveGroupsAccumulator::<$t, _>::new($dt, |x, y| x.bitxor_assign(y)), + )), + } + }; +} + +/// `accumulator_helper` is a macro accepting (ArrowPrimitiveType, BitwiseOperationType, bool) +macro_rules! accumulator_helper { + ($t:ty, $opr:expr, $is_distinct: expr) => { + match $opr { + BitwiseOperationType::And => Ok(Box::>::default()), + BitwiseOperationType::Or => Ok(Box::>::default()), + BitwiseOperationType::Xor => { + if $is_distinct { + Ok(Box::>::default()) + } else { + Ok(Box::>::default()) + } + } + } + }; +} + +/// AND, OR and XOR only supports a subset of numeric types +/// +/// `args` is [AccumulatorArgs] +/// `opr` is [BitwiseOperationType] +/// `is_distinct` is boolean value indicating whether the operation is distinct or not. +macro_rules! downcast_bitwise_accumulator { + ($args:ident, $opr:expr, $is_distinct: expr) => { + match $args.data_type { + DataType::Int8 => accumulator_helper!(Int8Type, $opr, $is_distinct), + DataType::Int16 => accumulator_helper!(Int16Type, $opr, $is_distinct), + DataType::Int32 => accumulator_helper!(Int32Type, $opr, $is_distinct), + DataType::Int64 => accumulator_helper!(Int64Type, $opr, $is_distinct), + DataType::UInt8 => accumulator_helper!(UInt8Type, $opr, $is_distinct), + DataType::UInt16 => accumulator_helper!(UInt16Type, $opr, $is_distinct), + DataType::UInt32 => accumulator_helper!(UInt32Type, $opr, $is_distinct), + DataType::UInt64 => accumulator_helper!(UInt64Type, $opr, $is_distinct), + _ => { + not_impl_err!( + "{} not supported for {}: {}", + stringify!($opr), + $args.name, + $args.data_type + ) + } + } + }; +} + +/// Simplifies the creation of User-Defined Aggregate Functions (UDAFs) for performing bitwise operations in a declarative manner. +/// +/// `EXPR_FN` identifier used to name the generated expression function. +/// `AGGREGATE_UDF_FN` is an identifier used to name the underlying UDAF function. +/// `OPR_TYPE` is an expression that evaluates to the type of bitwise operation to be performed. +macro_rules! make_bitwise_udaf_expr_and_func { + ($EXPR_FN:ident, $AGGREGATE_UDF_FN:ident, $OPR_TYPE:expr) => { + make_udaf_expr!( + $EXPR_FN, + expr_x, + concat!( + "Returns the bitwise", + stringify!($OPR_TYPE), + "of a group of values" + ), + $AGGREGATE_UDF_FN + ); + create_func!( + $EXPR_FN, + $AGGREGATE_UDF_FN, + BitwiseOperation::new($OPR_TYPE, stringify!($EXPR_FN)) + ); + }; +} + +make_bitwise_udaf_expr_and_func!(bit_and, bit_and_udaf, BitwiseOperationType::And); +make_bitwise_udaf_expr_and_func!(bit_or, bit_or_udaf, BitwiseOperationType::Or); +make_bitwise_udaf_expr_and_func!(bit_xor, bit_xor_udaf, BitwiseOperationType::Xor); + +/// The different types of bitwise operations that can be performed. +#[derive(Debug, Clone, Eq, PartialEq)] +enum BitwiseOperationType { + And, + Or, + Xor, +} + +impl Display for BitwiseOperationType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +/// [BitwiseOperation] struct encapsulates information about a bitwise operation. +#[derive(Debug)] +struct BitwiseOperation { + signature: Signature, + /// `operation` indicates the type of bitwise operation to be performed. + operation: BitwiseOperationType, + func_name: &'static str, +} + +impl BitwiseOperation { + pub fn new(operator: BitwiseOperationType, func_name: &'static str) -> Self { + Self { + operation: operator, + signature: Signature::uniform(1, INTEGERS.to_vec(), Volatility::Immutable), + func_name, + } + } +} + +impl AggregateUDFImpl for BitwiseOperation { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + self.func_name + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + let arg_type = &arg_types[0]; + if !arg_type.is_integer() { + return exec_err!( + "[return_type] {} not supported for {}", + self.name(), + arg_type + ); + } + Ok(arg_type.clone()) + } + + fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { + downcast_bitwise_accumulator!(acc_args, self.operation, acc_args.is_distinct) + } + + fn state_fields(&self, args: StateFieldsArgs) -> Result> { + if self.operation == BitwiseOperationType::Xor && args.is_distinct { + Ok(vec![Field::new_list( + format_state_name( + args.name, + format!("{} distinct", self.name()).as_str(), + ), + Field::new("item", args.return_type.clone(), true), + false, + )]) + } else { + Ok(vec![Field::new( + format_state_name(args.name, self.name()), + args.return_type.clone(), + true, + )]) + } + } + + fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool { + true + } + + fn create_groups_accumulator( + &self, + args: AccumulatorArgs, + ) -> Result> { + let data_type = args.data_type; + let operation = &self.operation; + downcast_integer! { + data_type => (group_accumulator_helper, data_type, operation), + _ => not_impl_err!( + "GroupsAccumulator not supported for {} with {}", + self.name(), + data_type + ), + } + } + + fn reverse_expr(&self) -> ReversedUDAF { + ReversedUDAF::Identical + } +} + +struct BitAndAccumulator { + value: Option, +} + +impl std::fmt::Debug for BitAndAccumulator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BitAndAccumulator({})", T::DATA_TYPE) + } +} + +impl Default for BitAndAccumulator { + fn default() -> Self { + Self { value: None } + } +} + +impl Accumulator for BitAndAccumulator +where + T::Native: std::ops::BitAnd, +{ + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if let Some(x) = arrow::compute::bit_and(values[0].as_primitive::()) { + let v = self.value.get_or_insert(x); + *v = *v & x; + } + Ok(()) + } + + fn evaluate(&mut self) -> Result { + ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } + + fn state(&mut self) -> Result> { + Ok(vec![self.evaluate()?]) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.update_batch(states) + } +} + +struct BitOrAccumulator { + value: Option, +} + +impl std::fmt::Debug for BitOrAccumulator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BitOrAccumulator({})", T::DATA_TYPE) + } +} + +impl Default for BitOrAccumulator { + fn default() -> Self { + Self { value: None } + } +} + +impl Accumulator for BitOrAccumulator +where + T::Native: std::ops::BitOr, +{ + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if let Some(x) = arrow::compute::bit_or(values[0].as_primitive::()) { + let v = self.value.get_or_insert(T::Native::usize_as(0)); + *v = *v | x; + } + Ok(()) + } + + fn evaluate(&mut self) -> Result { + ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } + + fn state(&mut self) -> Result> { + Ok(vec![self.evaluate()?]) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.update_batch(states) + } +} + +struct BitXorAccumulator { + value: Option, +} + +impl std::fmt::Debug for BitXorAccumulator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BitXorAccumulator({})", T::DATA_TYPE) + } +} + +impl Default for BitXorAccumulator { + fn default() -> Self { + Self { value: None } + } +} + +impl Accumulator for BitXorAccumulator +where + T::Native: std::ops::BitXor, +{ + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if let Some(x) = arrow::compute::bit_xor(values[0].as_primitive::()) { + let v = self.value.get_or_insert(T::Native::usize_as(0)); + *v = *v ^ x; + } + Ok(()) + } + + fn evaluate(&mut self) -> Result { + ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } + + fn state(&mut self) -> Result> { + Ok(vec![self.evaluate()?]) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.update_batch(states) + } +} + +struct DistinctBitXorAccumulator { + values: HashSet, +} + +impl std::fmt::Debug for DistinctBitXorAccumulator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "DistinctBitXorAccumulator({})", T::DATA_TYPE) + } +} + +impl Default for DistinctBitXorAccumulator { + fn default() -> Self { + Self { + values: HashSet::default(), + } + } +} + +impl Accumulator for DistinctBitXorAccumulator +where + T::Native: std::ops::BitXor + std::hash::Hash + Eq, +{ + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if values.is_empty() { + return Ok(()); + } + + let array = values[0].as_primitive::(); + match array.nulls().filter(|x| x.null_count() > 0) { + Some(n) => { + for idx in n.valid_indices() { + self.values.insert(array.value(idx)); + } + } + None => array.values().iter().for_each(|x| { + self.values.insert(*x); + }), + } + Ok(()) + } + + fn evaluate(&mut self) -> Result { + let mut acc = T::Native::usize_as(0); + for distinct_value in self.values.iter() { + acc = acc ^ *distinct_value; + } + let v = (!self.values.is_empty()).then_some(acc); + ScalarValue::new_primitive::(v, &T::DATA_TYPE) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + + self.values.capacity() * std::mem::size_of::() + } + + fn state(&mut self) -> Result> { + // 1. Stores aggregate state in `ScalarValue::List` + // 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set + let state_out = { + let values = self + .values + .iter() + .map(|x| ScalarValue::new_primitive::(Some(*x), &T::DATA_TYPE)) + .collect::>>()?; + + let arr = ScalarValue::new_list(&values, &T::DATA_TYPE); + vec![ScalarValue::List(arr)] + }; + Ok(state_out) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + if let Some(state) = states.first() { + let list_arr = as_list_array(state)?; + for arr in list_arr.iter().flatten() { + self.update_batch(&[arr])?; + } + } + Ok(()) + } +} diff --git a/datafusion/functions-aggregate/src/lib.rs b/datafusion/functions-aggregate/src/lib.rs index daddb9d93f78..990303bd1de3 100644 --- a/datafusion/functions-aggregate/src/lib.rs +++ b/datafusion/functions-aggregate/src/lib.rs @@ -69,6 +69,7 @@ pub mod variance; pub mod approx_median; pub mod approx_percentile_cont; pub mod approx_percentile_cont_with_weight; +pub mod bit_and_or_xor; use crate::approx_percentile_cont::approx_percentile_cont_udaf; use crate::approx_percentile_cont_with_weight::approx_percentile_cont_with_weight_udaf; @@ -84,6 +85,9 @@ pub mod expr_fn { pub use super::approx_median::approx_median; pub use super::approx_percentile_cont::approx_percentile_cont; pub use super::approx_percentile_cont_with_weight::approx_percentile_cont_with_weight; + pub use super::bit_and_or_xor::bit_and; + pub use super::bit_and_or_xor::bit_or; + pub use super::bit_and_or_xor::bit_xor; pub use super::count::count; pub use super::count::count_distinct; pub use super::covariance::covar_pop; @@ -134,6 +138,9 @@ pub fn all_default_aggregate_functions() -> Vec> { approx_distinct::approx_distinct_udaf(), approx_percentile_cont_udaf(), approx_percentile_cont_with_weight_udaf(), + bit_and_or_xor::bit_and_udaf(), + bit_and_or_xor::bit_or_udaf(), + bit_and_or_xor::bit_xor_udaf(), ] } diff --git a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs deleted file mode 100644 index 3fa225c5e479..000000000000 --- a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs +++ /dev/null @@ -1,695 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Defines BitAnd, BitOr, and BitXor Aggregate accumulators - -use ahash::RandomState; -use datafusion_common::cast::as_list_array; -use std::any::Any; -use std::sync::Arc; - -use crate::{AggregateExpr, PhysicalExpr}; -use arrow::datatypes::DataType; -use arrow::{array::ArrayRef, datatypes::Field}; -use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue}; -use datafusion_expr::{Accumulator, GroupsAccumulator}; -use std::collections::HashSet; - -use crate::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; -use crate::aggregate::utils::down_cast_any_ref; -use crate::expressions::format_state_name; -use arrow::array::Array; -use arrow::compute::{bit_and, bit_or, bit_xor}; -use arrow_array::cast::AsArray; -use arrow_array::{downcast_integer, ArrowNumericType}; -use arrow_buffer::ArrowNativeType; - -/// BIT_AND aggregate expression -#[derive(Debug, Clone)] -pub struct BitAnd { - name: String, - pub data_type: DataType, - expr: Arc, - nullable: bool, -} - -impl BitAnd { - /// Create a new BIT_AND aggregate function - pub fn new( - expr: Arc, - name: impl Into, - data_type: DataType, - ) -> Self { - Self { - name: name.into(), - expr, - data_type, - nullable: true, - } - } -} - -impl AggregateExpr for BitAnd { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn field(&self) -> Result { - Ok(Field::new( - &self.name, - self.data_type.clone(), - self.nullable, - )) - } - - fn create_accumulator(&self) -> Result> { - macro_rules! helper { - ($t:ty) => { - Ok(Box::>::default()) - }; - } - downcast_integer! { - &self.data_type => (helper), - _ => Err(DataFusionError::NotImplemented(format!( - "BitAndAccumulator not supported for {} with {}", - self.name(), - self.data_type - ))), - } - } - - fn state_fields(&self) -> Result> { - Ok(vec![Field::new( - format_state_name(&self.name, "bit_and"), - self.data_type.clone(), - self.nullable, - )]) - } - - fn expressions(&self) -> Vec> { - vec![self.expr.clone()] - } - - fn name(&self) -> &str { - &self.name - } - - fn groups_accumulator_supported(&self) -> bool { - true - } - - fn create_groups_accumulator(&self) -> Result> { - use std::ops::BitAndAssign; - - // Note the default value for BitAnd should be all set, i.e. `!0` - macro_rules! helper { - ($t:ty, $dt:expr) => { - Ok(Box::new( - PrimitiveGroupsAccumulator::<$t, _>::new($dt, |x, y| { - x.bitand_assign(y) - }) - .with_starting_value(!0), - )) - }; - } - - let data_type = &self.data_type; - downcast_integer! { - data_type => (helper, data_type), - _ => not_impl_err!( - "GroupsAccumulator not supported for {} with {}", - self.name(), - self.data_type - ), - } - } - - fn reverse_expr(&self) -> Option> { - Some(Arc::new(self.clone())) - } -} - -impl PartialEq for BitAnd { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.data_type == x.data_type - && self.nullable == x.nullable - && self.expr.eq(&x.expr) - }) - .unwrap_or(false) - } -} - -struct BitAndAccumulator { - value: Option, -} - -impl std::fmt::Debug for BitAndAccumulator { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "BitAndAccumulator({})", T::DATA_TYPE) - } -} - -impl Default for BitAndAccumulator { - fn default() -> Self { - Self { value: None } - } -} - -impl Accumulator for BitAndAccumulator -where - T::Native: std::ops::BitAnd, -{ - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if let Some(x) = bit_and(values[0].as_primitive::()) { - let v = self.value.get_or_insert(x); - *v = *v & x; - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - self.update_batch(states) - } - - fn state(&mut self) -> Result> { - Ok(vec![self.evaluate()?]) - } - - fn evaluate(&mut self) -> Result { - ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) - } - - fn size(&self) -> usize { - std::mem::size_of_val(self) - } -} - -/// BIT_OR aggregate expression -#[derive(Debug, Clone)] -pub struct BitOr { - name: String, - pub data_type: DataType, - expr: Arc, - nullable: bool, -} - -impl BitOr { - /// Create a new BIT_OR aggregate function - pub fn new( - expr: Arc, - name: impl Into, - data_type: DataType, - ) -> Self { - Self { - name: name.into(), - expr, - data_type, - nullable: true, - } - } -} - -impl AggregateExpr for BitOr { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn field(&self) -> Result { - Ok(Field::new( - &self.name, - self.data_type.clone(), - self.nullable, - )) - } - - fn create_accumulator(&self) -> Result> { - macro_rules! helper { - ($t:ty) => { - Ok(Box::>::default()) - }; - } - downcast_integer! { - &self.data_type => (helper), - _ => Err(DataFusionError::NotImplemented(format!( - "BitOrAccumulator not supported for {} with {}", - self.name(), - self.data_type - ))), - } - } - - fn state_fields(&self) -> Result> { - Ok(vec![Field::new( - format_state_name(&self.name, "bit_or"), - self.data_type.clone(), - self.nullable, - )]) - } - - fn expressions(&self) -> Vec> { - vec![self.expr.clone()] - } - - fn name(&self) -> &str { - &self.name - } - - fn groups_accumulator_supported(&self) -> bool { - true - } - - fn create_groups_accumulator(&self) -> Result> { - use std::ops::BitOrAssign; - macro_rules! helper { - ($t:ty, $dt:expr) => { - Ok(Box::new(PrimitiveGroupsAccumulator::<$t, _>::new( - $dt, - |x, y| x.bitor_assign(y), - ))) - }; - } - - let data_type = &self.data_type; - downcast_integer! { - data_type => (helper, data_type), - _ => not_impl_err!( - "GroupsAccumulator not supported for {} with {}", - self.name(), - self.data_type - ), - } - } - - fn reverse_expr(&self) -> Option> { - Some(Arc::new(self.clone())) - } -} - -impl PartialEq for BitOr { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.data_type == x.data_type - && self.nullable == x.nullable - && self.expr.eq(&x.expr) - }) - .unwrap_or(false) - } -} - -struct BitOrAccumulator { - value: Option, -} - -impl std::fmt::Debug for BitOrAccumulator { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "BitOrAccumulator({})", T::DATA_TYPE) - } -} - -impl Default for BitOrAccumulator { - fn default() -> Self { - Self { value: None } - } -} - -impl Accumulator for BitOrAccumulator -where - T::Native: std::ops::BitOr, -{ - fn state(&mut self) -> Result> { - Ok(vec![self.evaluate()?]) - } - - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if let Some(x) = bit_or(values[0].as_primitive::()) { - let v = self.value.get_or_insert(T::Native::usize_as(0)); - *v = *v | x; - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - self.update_batch(states) - } - - fn evaluate(&mut self) -> Result { - ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) - } - - fn size(&self) -> usize { - std::mem::size_of_val(self) - } -} - -/// BIT_XOR aggregate expression -#[derive(Debug, Clone)] -pub struct BitXor { - name: String, - pub data_type: DataType, - expr: Arc, - nullable: bool, -} - -impl BitXor { - /// Create a new BIT_XOR aggregate function - pub fn new( - expr: Arc, - name: impl Into, - data_type: DataType, - ) -> Self { - Self { - name: name.into(), - expr, - data_type, - nullable: true, - } - } -} - -impl AggregateExpr for BitXor { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn field(&self) -> Result { - Ok(Field::new( - &self.name, - self.data_type.clone(), - self.nullable, - )) - } - - fn create_accumulator(&self) -> Result> { - macro_rules! helper { - ($t:ty) => { - Ok(Box::>::default()) - }; - } - downcast_integer! { - &self.data_type => (helper), - _ => Err(DataFusionError::NotImplemented(format!( - "BitXor not supported for {} with {}", - self.name(), - self.data_type - ))), - } - } - - fn state_fields(&self) -> Result> { - Ok(vec![Field::new( - format_state_name(&self.name, "bit_xor"), - self.data_type.clone(), - self.nullable, - )]) - } - - fn expressions(&self) -> Vec> { - vec![self.expr.clone()] - } - - fn name(&self) -> &str { - &self.name - } - - fn groups_accumulator_supported(&self) -> bool { - true - } - - fn create_groups_accumulator(&self) -> Result> { - use std::ops::BitXorAssign; - macro_rules! helper { - ($t:ty, $dt:expr) => { - Ok(Box::new(PrimitiveGroupsAccumulator::<$t, _>::new( - $dt, - |x, y| x.bitxor_assign(y), - ))) - }; - } - - let data_type = &self.data_type; - downcast_integer! { - data_type => (helper, data_type), - _ => not_impl_err!( - "GroupsAccumulator not supported for {} with {}", - self.name(), - self.data_type - ), - } - } - - fn reverse_expr(&self) -> Option> { - Some(Arc::new(self.clone())) - } -} - -impl PartialEq for BitXor { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.data_type == x.data_type - && self.nullable == x.nullable - && self.expr.eq(&x.expr) - }) - .unwrap_or(false) - } -} - -struct BitXorAccumulator { - value: Option, -} - -impl std::fmt::Debug for BitXorAccumulator { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "BitXorAccumulator({})", T::DATA_TYPE) - } -} - -impl Default for BitXorAccumulator { - fn default() -> Self { - Self { value: None } - } -} - -impl Accumulator for BitXorAccumulator -where - T::Native: std::ops::BitXor, -{ - fn state(&mut self) -> Result> { - Ok(vec![self.evaluate()?]) - } - - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if let Some(x) = bit_xor(values[0].as_primitive::()) { - let v = self.value.get_or_insert(T::Native::usize_as(0)); - *v = *v ^ x; - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - self.update_batch(states) - } - - fn evaluate(&mut self) -> Result { - ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) - } - - fn size(&self) -> usize { - std::mem::size_of_val(self) - } -} - -/// Expression for a BIT_XOR(DISTINCT) aggregation. -#[derive(Debug, Clone)] -pub struct DistinctBitXor { - name: String, - pub data_type: DataType, - expr: Arc, - nullable: bool, -} - -impl DistinctBitXor { - /// Create a new DistinctBitXor aggregate function - pub fn new( - expr: Arc, - name: impl Into, - data_type: DataType, - ) -> Self { - Self { - name: name.into(), - expr, - data_type, - nullable: true, - } - } -} - -impl AggregateExpr for DistinctBitXor { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn field(&self) -> Result { - Ok(Field::new( - &self.name, - self.data_type.clone(), - self.nullable, - )) - } - - fn create_accumulator(&self) -> Result> { - macro_rules! helper { - ($t:ty) => { - Ok(Box::>::default()) - }; - } - downcast_integer! { - &self.data_type => (helper), - _ => Err(DataFusionError::NotImplemented(format!( - "DistinctBitXorAccumulator not supported for {} with {}", - self.name(), - self.data_type - ))), - } - } - - fn state_fields(&self) -> Result> { - // State field is a List which stores items to rebuild hash set. - Ok(vec![Field::new_list( - format_state_name(&self.name, "bit_xor distinct"), - Field::new("item", self.data_type.clone(), true), - false, - )]) - } - - fn expressions(&self) -> Vec> { - vec![self.expr.clone()] - } - - fn name(&self) -> &str { - &self.name - } -} - -impl PartialEq for DistinctBitXor { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.data_type == x.data_type - && self.nullable == x.nullable - && self.expr.eq(&x.expr) - }) - .unwrap_or(false) - } -} - -struct DistinctBitXorAccumulator { - values: HashSet, -} - -impl std::fmt::Debug for DistinctBitXorAccumulator { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "DistinctBitXorAccumulator({})", T::DATA_TYPE) - } -} - -impl Default for DistinctBitXorAccumulator { - fn default() -> Self { - Self { - values: HashSet::default(), - } - } -} - -impl Accumulator for DistinctBitXorAccumulator -where - T::Native: std::ops::BitXor + std::hash::Hash + Eq, -{ - fn state(&mut self) -> Result> { - // 1. Stores aggregate state in `ScalarValue::List` - // 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set - let state_out = { - let values = self - .values - .iter() - .map(|x| ScalarValue::new_primitive::(Some(*x), &T::DATA_TYPE)) - .collect::>>()?; - - let arr = ScalarValue::new_list(&values, &T::DATA_TYPE); - vec![ScalarValue::List(arr)] - }; - Ok(state_out) - } - - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if values.is_empty() { - return Ok(()); - } - - let array = values[0].as_primitive::(); - match array.nulls().filter(|x| x.null_count() > 0) { - Some(n) => { - for idx in n.valid_indices() { - self.values.insert(array.value(idx)); - } - } - None => array.values().iter().for_each(|x| { - self.values.insert(*x); - }), - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - if let Some(state) = states.first() { - let list_arr = as_list_array(state)?; - for arr in list_arr.iter().flatten() { - self.update_batch(&[arr])?; - } - } - Ok(()) - } - - fn evaluate(&mut self) -> Result { - let mut acc = T::Native::usize_as(0); - for distinct_value in self.values.iter() { - acc = acc ^ *distinct_value; - } - let v = (!self.values.is_empty()).then_some(acc); - ScalarValue::new_primitive::(v, &T::DATA_TYPE) - } - - fn size(&self) -> usize { - std::mem::size_of_val(self) - + self.values.capacity() * std::mem::size_of::() - } -} diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs b/datafusion/physical-expr/src/aggregate/build_in.rs index a1f5f153a9ff..6c01decdbf95 100644 --- a/datafusion/physical-expr/src/aggregate/build_in.rs +++ b/datafusion/physical-expr/src/aggregate/build_in.rs @@ -66,26 +66,6 @@ pub fn create_aggregate_expr( name, data_type, )), - (AggregateFunction::BitAnd, _) => Arc::new(expressions::BitAnd::new( - input_phy_exprs[0].clone(), - name, - data_type, - )), - (AggregateFunction::BitOr, _) => Arc::new(expressions::BitOr::new( - input_phy_exprs[0].clone(), - name, - data_type, - )), - (AggregateFunction::BitXor, false) => Arc::new(expressions::BitXor::new( - input_phy_exprs[0].clone(), - name, - data_type, - )), - (AggregateFunction::BitXor, true) => Arc::new(expressions::DistinctBitXor::new( - input_phy_exprs[0].clone(), - name, - data_type, - )), (AggregateFunction::BoolAnd, _) => Arc::new(expressions::BoolAnd::new( input_phy_exprs[0].clone(), name, @@ -202,12 +182,10 @@ mod tests { use datafusion_expr::{type_coercion, Signature}; use crate::expressions::{ - try_cast, ArrayAgg, Avg, BitAnd, BitOr, BitXor, BoolAnd, BoolOr, - DistinctArrayAgg, Max, Min, + try_cast, ArrayAgg, Avg, BoolAnd, BoolOr, DistinctArrayAgg, Max, Min, }; use super::*; - #[test] fn test_approx_expr() -> Result<()> { let funcs = vec![AggregateFunction::ArrayAgg]; @@ -319,60 +297,6 @@ mod tests { Ok(()) } - #[test] - fn test_bit_and_or_xor_expr() -> Result<()> { - let funcs = vec![ - AggregateFunction::BitAnd, - AggregateFunction::BitOr, - AggregateFunction::BitXor, - ]; - let data_types = vec![DataType::UInt64, DataType::Int64]; - for fun in funcs { - for data_type in &data_types { - let input_schema = - Schema::new(vec![Field::new("c1", data_type.clone(), true)]); - let input_phy_exprs: Vec> = vec![Arc::new( - expressions::Column::new_with_schema("c1", &input_schema).unwrap(), - )]; - let result_agg_phy_exprs = create_physical_agg_expr_for_test( - &fun, - false, - &input_phy_exprs[0..1], - &input_schema, - "c1", - )?; - match fun { - AggregateFunction::BitAnd => { - assert!(result_agg_phy_exprs.as_any().is::()); - assert_eq!("c1", result_agg_phy_exprs.name()); - assert_eq!( - Field::new("c1", data_type.clone(), true), - result_agg_phy_exprs.field().unwrap() - ); - } - AggregateFunction::BitOr => { - assert!(result_agg_phy_exprs.as_any().is::()); - assert_eq!("c1", result_agg_phy_exprs.name()); - assert_eq!( - Field::new("c1", data_type.clone(), true), - result_agg_phy_exprs.field().unwrap() - ); - } - AggregateFunction::BitXor => { - assert!(result_agg_phy_exprs.as_any().is::()); - assert_eq!("c1", result_agg_phy_exprs.name()); - assert_eq!( - Field::new("c1", data_type.clone(), true), - result_agg_phy_exprs.field().unwrap() - ); - } - _ => {} - }; - } - } - Ok(()) - } - #[test] fn test_bool_and_or_expr() -> Result<()> { let funcs = vec![AggregateFunction::BoolAnd, AggregateFunction::BoolOr]; diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index c20902c11b86..0b1f5f577435 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -21,7 +21,6 @@ pub(crate) mod array_agg; pub(crate) mod array_agg_distinct; pub(crate) mod array_agg_ordered; pub(crate) mod average; -pub(crate) mod bit_and_or_xor; pub(crate) mod bool_and_or; pub(crate) mod correlation; pub(crate) mod covariance; diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index b9a159b21e3d..bffaafd7dac2 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -40,7 +40,6 @@ pub use crate::aggregate::array_agg_distinct::DistinctArrayAgg; pub use crate::aggregate::array_agg_ordered::OrderSensitiveArrayAgg; pub use crate::aggregate::average::Avg; pub use crate::aggregate::average::AvgAccumulator; -pub use crate::aggregate::bit_and_or_xor::{BitAnd, BitOr, BitXor, DistinctBitXor}; pub use crate::aggregate::bool_and_or::{BoolAnd, BoolOr}; pub use crate::aggregate::build_in::create_aggregate_expr; pub use crate::aggregate::correlation::Correlation; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index e5578ae62f3e..ae4445eaa8ce 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -491,9 +491,9 @@ enum AggregateFunction { // APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16; GROUPING = 17; // MEDIAN = 18; - BIT_AND = 19; - BIT_OR = 20; - BIT_XOR = 21; + // BIT_AND = 19; + // BIT_OR = 20; + // BIT_XOR = 21; BOOL_AND = 22; BOOL_OR = 23; // REGR_SLOPE = 26; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 4a7b9610e5bc..243c75435f8d 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -538,9 +538,6 @@ impl serde::Serialize for AggregateFunction { Self::ArrayAgg => "ARRAY_AGG", Self::Correlation => "CORRELATION", Self::Grouping => "GROUPING", - Self::BitAnd => "BIT_AND", - Self::BitOr => "BIT_OR", - Self::BitXor => "BIT_XOR", Self::BoolAnd => "BOOL_AND", Self::BoolOr => "BOOL_OR", Self::StringAgg => "STRING_AGG", @@ -562,9 +559,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction { "ARRAY_AGG", "CORRELATION", "GROUPING", - "BIT_AND", - "BIT_OR", - "BIT_XOR", "BOOL_AND", "BOOL_OR", "STRING_AGG", @@ -615,9 +609,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction { "ARRAY_AGG" => Ok(AggregateFunction::ArrayAgg), "CORRELATION" => Ok(AggregateFunction::Correlation), "GROUPING" => Ok(AggregateFunction::Grouping), - "BIT_AND" => Ok(AggregateFunction::BitAnd), - "BIT_OR" => Ok(AggregateFunction::BitOr), - "BIT_XOR" => Ok(AggregateFunction::BitXor), "BOOL_AND" => Ok(AggregateFunction::BoolAnd), "BOOL_OR" => Ok(AggregateFunction::BoolOr), "STRING_AGG" => Ok(AggregateFunction::StringAgg), diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index ffaef445d668..1172eccb90fd 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1945,9 +1945,9 @@ pub enum AggregateFunction { /// APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16; Grouping = 17, /// MEDIAN = 18; - BitAnd = 19, - BitOr = 20, - BitXor = 21, + /// BIT_AND = 19; + /// BIT_OR = 20; + /// BIT_XOR = 21; BoolAnd = 22, BoolOr = 23, /// REGR_SLOPE = 26; @@ -1975,9 +1975,6 @@ impl AggregateFunction { AggregateFunction::ArrayAgg => "ARRAY_AGG", AggregateFunction::Correlation => "CORRELATION", AggregateFunction::Grouping => "GROUPING", - AggregateFunction::BitAnd => "BIT_AND", - AggregateFunction::BitOr => "BIT_OR", - AggregateFunction::BitXor => "BIT_XOR", AggregateFunction::BoolAnd => "BOOL_AND", AggregateFunction::BoolOr => "BOOL_OR", AggregateFunction::StringAgg => "STRING_AGG", @@ -1993,9 +1990,6 @@ impl AggregateFunction { "ARRAY_AGG" => Some(Self::ArrayAgg), "CORRELATION" => Some(Self::Correlation), "GROUPING" => Some(Self::Grouping), - "BIT_AND" => Some(Self::BitAnd), - "BIT_OR" => Some(Self::BitOr), - "BIT_XOR" => Some(Self::BitXor), "BOOL_AND" => Some(Self::BoolAnd), "BOOL_OR" => Some(Self::BoolOr), "STRING_AGG" => Some(Self::StringAgg), diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 25b7413a984a..43cc352f98dd 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -140,9 +140,6 @@ impl From for AggregateFunction { protobuf::AggregateFunction::Min => Self::Min, protobuf::AggregateFunction::Max => Self::Max, protobuf::AggregateFunction::Avg => Self::Avg, - protobuf::AggregateFunction::BitAnd => Self::BitAnd, - protobuf::AggregateFunction::BitOr => Self::BitOr, - protobuf::AggregateFunction::BitXor => Self::BitXor, protobuf::AggregateFunction::BoolAnd => Self::BoolAnd, protobuf::AggregateFunction::BoolOr => Self::BoolOr, protobuf::AggregateFunction::ArrayAgg => Self::ArrayAgg, diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index d9548325dac3..33a58daeaf0a 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -111,9 +111,6 @@ impl From<&AggregateFunction> for protobuf::AggregateFunction { AggregateFunction::Min => Self::Min, AggregateFunction::Max => Self::Max, AggregateFunction::Avg => Self::Avg, - AggregateFunction::BitAnd => Self::BitAnd, - AggregateFunction::BitOr => Self::BitOr, - AggregateFunction::BitXor => Self::BitXor, AggregateFunction::BoolAnd => Self::BoolAnd, AggregateFunction::BoolOr => Self::BoolOr, AggregateFunction::ArrayAgg => Self::ArrayAgg, @@ -380,9 +377,6 @@ pub fn serialize_expr( AggregateFunction::ArrayAgg => protobuf::AggregateFunction::ArrayAgg, AggregateFunction::Min => protobuf::AggregateFunction::Min, AggregateFunction::Max => protobuf::AggregateFunction::Max, - AggregateFunction::BitAnd => protobuf::AggregateFunction::BitAnd, - AggregateFunction::BitOr => protobuf::AggregateFunction::BitOr, - AggregateFunction::BitXor => protobuf::AggregateFunction::BitXor, AggregateFunction::BoolAnd => protobuf::AggregateFunction::BoolAnd, AggregateFunction::BoolOr => protobuf::AggregateFunction::BoolOr, AggregateFunction::Avg => protobuf::AggregateFunction::Avg, diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 3a4c35a93e16..886179bf5627 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -23,11 +23,11 @@ use datafusion::datasource::file_format::parquet::ParquetSink; use datafusion::physical_expr::window::{NthValueKind, SlidingAggregateWindowExpr}; use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr}; use datafusion::physical_plan::expressions::{ - ArrayAgg, Avg, BinaryExpr, BitAnd, BitOr, BitXor, BoolAnd, BoolOr, CaseExpr, - CastExpr, Column, Correlation, CumeDist, DistinctArrayAgg, DistinctBitXor, Grouping, - InListExpr, IsNotNullExpr, IsNullExpr, Literal, Max, Min, NegativeExpr, NotExpr, - NthValue, NthValueAgg, Ntile, OrderSensitiveArrayAgg, Rank, RankType, RowNumber, - StringAgg, TryCastExpr, WindowShift, + ArrayAgg, Avg, BinaryExpr, BoolAnd, BoolOr, CaseExpr, CastExpr, Column, Correlation, + CumeDist, DistinctArrayAgg, Grouping, InListExpr, IsNotNullExpr, IsNullExpr, Literal, + Max, Min, NegativeExpr, NotExpr, NthValue, NthValueAgg, Ntile, + OrderSensitiveArrayAgg, Rank, RankType, RowNumber, StringAgg, TryCastExpr, + WindowShift, }; use datafusion::physical_plan::udaf::AggregateFunctionExpr; use datafusion::physical_plan::windows::{BuiltInWindowExpr, PlainAggregateWindowExpr}; @@ -241,15 +241,6 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> Result { let inner = if aggr_expr.downcast_ref::().is_some() { protobuf::AggregateFunction::Grouping - } else if aggr_expr.downcast_ref::().is_some() { - protobuf::AggregateFunction::BitAnd - } else if aggr_expr.downcast_ref::().is_some() { - protobuf::AggregateFunction::BitOr - } else if aggr_expr.downcast_ref::().is_some() { - protobuf::AggregateFunction::BitXor - } else if aggr_expr.downcast_ref::().is_some() { - distinct = true; - protobuf::AggregateFunction::BitXor } else if aggr_expr.downcast_ref::().is_some() { protobuf::AggregateFunction::BoolAnd } else if aggr_expr.downcast_ref::().is_some() { diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index a496e226855a..52696a106183 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -59,6 +59,7 @@ use datafusion_expr::{ TryCast, Volatility, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, WindowUDF, WindowUDFImpl, }; +use datafusion_functions_aggregate::expr_fn::{bit_and, bit_or, bit_xor}; use datafusion_proto::bytes::{ logical_plan_from_bytes, logical_plan_from_bytes_with_extension_codec, logical_plan_to_bytes, logical_plan_to_bytes_with_extension_codec, @@ -665,6 +666,9 @@ async fn roundtrip_expr_api() -> Result<()> { approx_median(lit(2)), approx_percentile_cont(lit(2), lit(0.5)), approx_percentile_cont_with_weight(lit(2), lit(1), lit(0.5)), + bit_and(lit(2)), + bit_or(lit(2)), + bit_xor(lit(2)), ]; // ensure expressions created with the expr api can be round tripped