Skip to content

Commit

Permalink
fix: fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
devanbenz committed Oct 3, 2024
1 parent a52cefe commit a0a1572
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 36 deletions.
2 changes: 1 addition & 1 deletion datafusion/functions-aggregate-common/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@
// under the License.

pub mod count_distinct;
pub mod groups_accumulator;
pub mod groups_accumulator;
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,32 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use arrow::array::{Array, ArrayRef, AsArray, BinaryBuilder, BinaryViewBuilder, BooleanArray};
use arrow::array::{
Array, ArrayRef, AsArray, BinaryBuilder, BinaryViewBuilder, BooleanArray,
};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
use std::sync::Arc;

pub struct StringGroupsAccumulator<F, const VIEW: bool> {
states: Vec<String>,
fun: F
fun: F,
}

impl<F, const VIEW: bool> StringGroupsAccumulator<F, VIEW>
where
impl<F, const VIEW: bool> StringGroupsAccumulator<F, VIEW>
where
F: Fn(&[u8], &[u8]) -> bool + Send + Sync,
{
pub fn new(s_fn: F) -> Self {
Self {
Self {
states: Vec::new(),
fun: s_fn
fun: s_fn,
}
}
}

impl<F, const VIEW: bool> GroupsAccumulator for StringGroupsAccumulator<F, VIEW>
where
impl<F, const VIEW: bool> GroupsAccumulator for StringGroupsAccumulator<F, VIEW>
where
F: Fn(&[u8], &[u8]) -> bool + Send + Sync,
{
fn update_batch(
Expand All @@ -55,7 +57,7 @@ where
for (i, &group_index) in group_indices.iter().enumerate() {
invoke_accumulator::<F, VIEW>(self, input_array, opt_filter, group_index, i)
}

Ok(())
}

Expand Down Expand Up @@ -155,7 +157,7 @@ where
}

let filter = opt_filter.unwrap();

let array = if VIEW {
let mut builder = BinaryViewBuilder::new();

Expand Down Expand Up @@ -208,31 +210,33 @@ where
}
}

fn invoke_accumulator<F, const VIEW: bool>(accumulator: &mut StringGroupsAccumulator<F, VIEW>, input_array: &ArrayRef, opt_filter: Option<&BooleanArray>, group_index: usize, i: usize)
where
F: Fn(&[u8], &[u8]) -> bool + Send + Sync
fn invoke_accumulator<F, const VIEW: bool>(
accumulator: &mut StringGroupsAccumulator<F, VIEW>,
input_array: &ArrayRef,
opt_filter: Option<&BooleanArray>,
group_index: usize,
i: usize,
) where
F: Fn(&[u8], &[u8]) -> bool + Send + Sync,
{
if let Some(filter) = opt_filter {
if !filter.value(i) {
return
return;
}
}
if input_array.is_null(i) {
return
return;
}

let value: &[u8] = if VIEW {
input_array.as_binary_view().value(i)
} else {
input_array.as_binary::<i32>().value(i)
};

let value_str = std::str::from_utf8(value).map_err(|e| {
DataFusionError::Execution(format!(
"could not build utf8 {}",
e
))
}).expect("failed to build utf8");
let value_str = std::str::from_utf8(value)
.map_err(|e| DataFusionError::Execution(format!("could not build utf8 {}", e)))
.expect("failed to build utf8");

if accumulator.states[group_index].is_empty() {
accumulator.states[group_index] = value_str.to_string();
Expand Down
21 changes: 8 additions & 13 deletions datafusion/functions-aggregate/src/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,9 @@ macro_rules! instantiate_max_accumulator {

macro_rules! instantiate_max_string_accumulator {
($VIEW:expr) => {{
Ok(Box::new(
StringGroupsAccumulator::<_, $VIEW>::new(|a, b| {
a > b
})
))
Ok(Box::new(StringGroupsAccumulator::<_, $VIEW>::new(
|a, b| a > b,
)))
}};
}

Expand All @@ -158,14 +156,11 @@ macro_rules! instantiate_min_accumulator {
}};
}


macro_rules! instantiate_min_string_accumulator {
($VIEW:expr) => {{
Ok(Box::new(
StringGroupsAccumulator::<_, $VIEW>::new(|a, b| {
a < b
})
))
Ok(Box::new(StringGroupsAccumulator::<_, $VIEW>::new(
|a, b| a < b,
)))
}};
}

Expand Down Expand Up @@ -1065,10 +1060,10 @@ impl AggregateUDFImpl for Min {
}
BinaryView => {
instantiate_min_string_accumulator!(true)
},
}
Binary => {
instantiate_min_string_accumulator!(false)
},
}

// It would be nice to have a fast implementation for Strings as well
// https://github.com/apache/datafusion/issues/6906
Expand Down

0 comments on commit a0a1572

Please sign in to comment.