Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
  • Loading branch information
jayzhan211 committed Dec 14, 2023
1 parent c1bf028 commit a006450
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 29 deletions.
22 changes: 22 additions & 0 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,24 @@ where
Ok(())
}

fn hash_struct_array(
array: &StructArray,
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
// let fields = array.fields();
// let columns = fields
// .iter()
// .map(|field| array.column_by_name(field.name()).unwrap())
// .collect::<Vec<_>>();
// let mut columns_hashes = vec![0u64; array.len()];
// create_hashes(&columns, random_state, &mut columns_hashes)?;
// for (i, hash) in hashes_buffer.iter_mut().enumerate() {
// *hash = columns_hashes[i];
// }
Ok(())
}

/// Test version of `create_hashes` that produces the same value for
/// all hashes (to test collisions)
///
Expand Down Expand Up @@ -335,6 +353,10 @@ pub fn create_hashes<'a>(
let array = as_large_list_array(array);
hash_list_array(array, random_state, hashes_buffer)?;
}
DataType::Struct(_) => {
let array = array.as_struct();
hash_struct_array(array, random_state, hashes_buffer)?;
}
_ => {
// This is internal because we should have caught this before.
return _internal_err!(
Expand Down
99 changes: 70 additions & 29 deletions datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,10 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
// values received from its ordering requirement expression. (This information is necessary for during merging).
let agg_orderings = &states[1];

println!(
"array_agg_values: {:?}, agg_orderings: {:?}",
array_agg_values, agg_orderings
);
// println!(
// "array_agg_values: {:?}, agg_orderings: {:?}",
// array_agg_values, agg_orderings
// );

if let Some(agg_orderings) = agg_orderings.as_list_opt::<i32>() {
// Stores ARRAY_AGG results coming from each partition
Expand All @@ -233,31 +233,72 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
// Existing values should be merged also.
partition_values.push(self.values.clone());
partition_ordering_values.push(self.ordering_values.clone());
println!("(1) partition_ordering_values: {:?}", partition_ordering_values);
// println!("(1) partition_ordering_values: {:?}", partition_ordering_values);

assert!(as_list_array(array_agg_values).is_ok());
// Convert array to Scalars to sort them easily. Convert back to array at evaluation.
let array_agg_res =
ScalarValue::convert_list_array_to_scalar_vec(array_agg_values)?;
partition_values.extend(array_agg_res);

println!("agg_orderings: {:?}", agg_orderings);
let mut field_values: Vec<Vec<ScalarValue>> = Vec::new();
assert_eq!(agg_orderings.len(), 1);
let agg_ordering_columns = agg_orderings.values();
let agg_ordering_columns = as_struct_array(agg_ordering_columns)?;
let num_columns = agg_ordering_columns.num_columns();
for col_index in 0..num_columns {
let col_array = agg_ordering_columns.column(col_index);
println!("col_array: {:?}", col_array);
let col_scalar =
ScalarValue::convert_non_list_array_to_scalars(&col_array)?;
println!("col_scalar: {:?}", col_scalar);
field_values.push(col_scalar);
// println!("agg_orderings: {:?}", agg_orderings);

let orderings = ScalarValue::convert_list_array_to_scalar_vec(agg_orderings)?;
// println!("orderings: {:?}", orderings);
for partition_ordering_rows in orderings.into_iter() {
// Extract value from struct to ordering_rows for each group/partition
let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| {
if let ScalarValue::Struct(ordering_columns_per_row) = ordering_row {
// Struct([StructArray
// [
// -- child 0: "c2@1 DESC" (Int32)
// PrimitiveArray<Int32>
// [
// 30,
// ]
// -- child 1: "c3@2 ASC NULLS LAST" (Int32)
// PrimitiveArray<Int32>
// [
// 4,
// ]
// ]]
let s = ordering_columns_per_row.as_struct();
let num_columns = s.num_columns();
let mut ordering_columns_per_row = vec![];
for col_index in 0..num_columns {
let col_array = s.column(col_index);
let sv = ScalarValue::try_from_array(col_array, 0)?;
ordering_columns_per_row.push(sv);
}

Ok(ordering_columns_per_row)
} else {
exec_err!(
"Expects to receive ScalarValue::Struct(Arc<StructArray>) but got:{:?}",
ordering_row.data_type()
)
}
}).collect::<Result<Vec<_>>>()?;
partition_ordering_values.push(ordering_value);
}

partition_ordering_values.push(field_values);
println!("partition_ordering_values: {:?}", partition_ordering_values);

// let mut field_values: Vec<Vec<ScalarValue>> = Vec::new();
// assert_eq!(agg_orderings.len(), 1);
// let agg_ordering_columns = agg_orderings.values();
// let agg_ordering_columns = as_struct_array(agg_ordering_columns)?;
// let num_columns = agg_ordering_columns.num_columns();
// for col_index in 0..num_columns {
// let col_array = agg_ordering_columns.column(col_index);
// println!("col_array: {:?}", col_array);
// let col_scalar =
// ScalarValue::convert_non_list_array_to_scalars(&col_array)?;
// println!("col_scalar: {:?}", col_scalar);
// field_values.push(col_scalar);
// }

// partition_ordering_values.push(field_values);
// println!("partition_ordering_values: {:?}", partition_ordering_values);

let sort_options = self
.ordering_req
Expand All @@ -280,15 +321,15 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
let mut result = vec![self.evaluate()?];
result.push(self.evaluate_orderings()?);
println!("result: {:?}", result);
// println!("result: {:?}", result);
Ok(result)
}

fn evaluate(&self) -> Result<ScalarValue> {
let arr = ScalarValue::new_list(&self.values, &self.datatypes[0]);
println!("self.vales: {:?}", self.values);
println!("arr: {:?}", arr);
println!("self.datatypes: {:?}", self.datatypes);
// println!("self.vales: {:?}", self.values);
// println!("arr: {:?}", arr);
// println!("self.datatypes: {:?}", self.datatypes);

Ok(ScalarValue::List(arr))
}
Expand Down Expand Up @@ -321,10 +362,10 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
impl OrderSensitiveArrayAggAccumulator {
fn evaluate_orderings(&self) -> Result<ScalarValue> {
let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
println!("fields: {:?}", fields);
// println!("fields: {:?}", fields);
let struct_field = Fields::from(fields.clone());
println!("struct_field: {:?}", struct_field);
println!("self.ordering_values: {:?}", self.ordering_values);
// println!("struct_field: {:?}", struct_field);
// println!("self.ordering_values: {:?}", self.ordering_values);
// let arr_vec = self
// .ordering_values
// .iter()
Expand All @@ -340,7 +381,7 @@ impl OrderSensitiveArrayAggAccumulator {
.iter()
.map(|x| x[i].clone())
.collect::<Vec<_>>();
println!("column_values: {:?}", column_values);
// println!("column_values: {:?}", column_values);
let array = if column_values.is_empty() {
new_empty_array(fields[i].data_type())
} else {
Expand All @@ -350,7 +391,7 @@ impl OrderSensitiveArrayAggAccumulator {
}

let ordering_array = StructArray::try_new(struct_field.clone(), column_wise_ordering_values, None)?;
println!("ordering_array: {:?}", ordering_array);
// println!("ordering_array: {:?}", ordering_array);
let a = array_into_list_array(Arc::new(ordering_array));
let a = Arc::new(a);
Ok(ScalarValue::List(a))
Expand Down

0 comments on commit a006450

Please sign in to comment.