Skip to content

Commit

Permalink
revert removed test in array distinct
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
  • Loading branch information
jayzhan211 committed Nov 11, 2023
1 parent 9f75d9c commit 49a671f
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 21 deletions.
39 changes: 38 additions & 1 deletion datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2055,6 +2055,43 @@ impl ScalarValue {
}
}

/// Retrieve ScalarValue for each row in `array`
///
/// Example
/// ```
/// use datafusion_common::ScalarValue;
/// use arrow::array::ListArray;
/// use arrow::datatypes::{DataType, Int32Type};
///
/// let list_arr = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
/// Some(vec![Some(1), Some(2), Some(3)]),
/// None,
/// Some(vec![Some(4), Some(5)])
/// ]);
///
/// let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&list_arr).unwrap();
///
/// let expected = vec![
/// vec![
/// ScalarValue::Int32(Some(1)),
/// ScalarValue::Int32(Some(2)),
/// ScalarValue::Int32(Some(3)),
/// ],
/// vec![],
/// vec![ScalarValue::Int32(Some(4)), ScalarValue::Int32(Some(5))]
/// ];
///
/// assert_eq!(scalar_vec, expected);
/// ```
pub fn convert_array_to_scalar_vec(array: &dyn Array) -> Result<Vec<Vec<Self>>> {
let data_type = array.data_type().to_owned();

match data_type {
DataType::List(_) => Self::convert_list_array_to_scalar_vec(array),
_ => Ok(vec![Self::convert_non_list_array_to_scalars(array)?]),
}
}

// TODO: Support more types after other ScalarValue is wrapped with ArrayRef
/// Get raw data (inner array) inside ScalarValue
pub fn raw_data(&self) -> Result<ArrayRef> {
Expand Down Expand Up @@ -3088,7 +3125,7 @@ mod tests {

let l12 = arrays_into_list_array([l1, l2]).unwrap();
let arr = Arc::new(l12) as ArrayRef;

let actual = ScalarValue::convert_list_array_to_scalar_vec(&arr).unwrap();
let expected = vec![
vec![
Expand Down
21 changes: 1 addition & 20 deletions datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,10 @@ impl Accumulator for DistinctArrayAggAccumulator {
Ok(vec![self.evaluate()?])
}

// fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
// assert_eq!(values.len(), 1, "batch input should only include 1 column!");

// let scalars = ScalarValue::convert_non_list_array_to_scalars(&values[0])?;
// self.values.extend(scalars);
// Ok(())
// }
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
assert_eq!(values.len(), 1, "batch input should only include 1 column!");

let array = &values[0];
println!("array: {:?}", array);
let scalars = ScalarValue::convert_array_to_scalar_vec(array)?;
for scalar in scalars {
self.values.extend(scalar)
Expand All @@ -157,18 +149,7 @@ impl Accumulator for DistinctArrayAggAccumulator {
return Ok(());
}

assert_eq!(
states.len(),
1,
"array_agg_distinct states must contain single array"
);

let scalar_vec = ScalarValue::convert_list_array_to_scalar_vec(&states[0])?;
for scalars in scalar_vec {
self.values.extend(scalars)
}

Ok(())
self.update_batch(states)
}

fn evaluate(&self) -> Result<ScalarValue> {
Expand Down

0 comments on commit 49a671f

Please sign in to comment.