Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Add multi ordering test for array agg order #8439

Merged
merged 1 commit into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions datafusion/core/tests/data/aggregate_agg_multi_order.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
c1,c2,c3
1,20,0
2,20,1
3,10,2
4,10,3
5,30,4
6,30,5
7,30,6
8,30,7
9,30,8
10,10,9
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why we need csv test is that only csv file trigger merge_batch, the normal table go to update_batch only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normal table

logical_plan
Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(arrays.column1) ORDER BY [arrays.column1 ASC NULLS LAST]]]
--TableScan: arrays projection=[column1]
physical_plan
AggregateExec: mode=Single, gby=[], aggr=[ARRAY_AGG(arrays.column1)]
--SortExec: expr=[column1@0 ASC NULLS LAST]
----MemoryExec: partitions=1, partition_sizes=[1]

AggregateExec: mode is Single, so run update_batch.

49 changes: 19 additions & 30 deletions datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};

use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Field};
use arrow_array::cast::AsArray;
use arrow_array::Array;
use arrow_schema::{Fields, SortOptions};
use datafusion_common::cast::as_list_array;
use datafusion_common::utils::{compare_rows, get_row_at_idx};
use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::Accumulator;
Expand Down Expand Up @@ -214,7 +214,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
// values received from its ordering requirement expression. (This information is necessary for during merging).
let agg_orderings = &states[1];

if as_list_array(agg_orderings).is_ok() {
if let Some(agg_orderings) = agg_orderings.as_list_opt::<i32>() {
// Stores ARRAY_AGG results coming from each partition
let mut partition_values = vec![];
// Stores ordering requirement expression results coming from each partition
Expand All @@ -232,10 +232,21 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
}

let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?;
// Ordering requirement expression values for each entry in the ARRAY_AGG list
let other_ordering_values = self.convert_array_agg_to_orderings(orderings)?;
for v in other_ordering_values.into_iter() {
partition_ordering_values.push(v);

for partition_ordering_rows in orderings.into_iter() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think convert_array_agg_to_orderings is doing quite specific thing, so remove the function and do things here.

// Extract value from struct to ordering_rows for each group/partition
let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| {
if let ScalarValue::Struct(Some(ordering_columns_per_row), _) = ordering_row {
Ok(ordering_columns_per_row)
} else {
exec_err!(
"Expects to receive ScalarValue::Struct(Some(..), _) but got:{:?}",
ordering_row.data_type()
)
}
}).collect::<Result<Vec<_>>>()?;

partition_ordering_values.push(ordering_value);
}

let sort_options = self
Expand Down Expand Up @@ -293,33 +304,10 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
}

impl OrderSensitiveArrayAggAccumulator {
/// Inner Vec\<ScalarValue> in the ordering_values can be thought as ordering information for the each ScalarValue in the values array.
/// See [`merge_ordered_arrays`] for more information.
fn convert_array_agg_to_orderings(
&self,
array_agg: Vec<Vec<ScalarValue>>,
) -> Result<Vec<Vec<Vec<ScalarValue>>>> {
let mut orderings = vec![];
// in_data is Vec<ScalarValue> where ScalarValue does not include ScalarValue::List
for in_data in array_agg.into_iter() {
let ordering = in_data.into_iter().map(|struct_vals| {
if let ScalarValue::Struct(Some(orderings), _) = struct_vals {
Ok(orderings)
} else {
exec_err!(
"Expects to receive ScalarValue::Struct(Some(..), _) but got:{:?}",
struct_vals.data_type()
)
}
}).collect::<Result<Vec<_>>>()?;
orderings.push(ordering);
}
Ok(orderings)
}

fn evaluate_orderings(&self) -> Result<ScalarValue> {
let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
let struct_field = Fields::from(fields.clone());

let orderings: Vec<ScalarValue> = self
.ordering_values
.iter()
Expand All @@ -329,6 +317,7 @@ impl OrderSensitiveArrayAggAccumulator {
.collect();
let struct_type = DataType::Struct(Fields::from(fields));

// Wrap in List, so we have the same data structure ListArray(StructArray..) for group by cases
let arr = ScalarValue::new_list(&orderings, &struct_type);
Ok(ScalarValue::List(arr))

This comment was marked as outdated.

}
Expand Down
30 changes: 30 additions & 0 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,36 @@ FROM
----
[0VVIHzxWtNOFLtnhjHEKjXaJOSLJfm, 0keZ5G8BffGwgF2RwQD59TFzMStxCB, 0og6hSkhbX8AC1ktFS4kounvTzy8Vo, 1aOcrEGd0cOqZe2I5XBOm0nDcwtBZO, 2T3wSlHdEmASmO0xcXHnndkKEt6bz8]

statement ok
CREATE EXTERNAL TABLE agg_order (
c1 INT NOT NULL,
c2 INT NOT NULL,
c3 INT NOT NULL
)
STORED AS CSV
WITH HEADER ROW
LOCATION '../core/tests/data/aggregate_agg_multi_order.csv';

# test array_agg with order by multiple columns
query ?
select array_agg(c1 order by c2 desc, c3) from agg_order;
----
[5, 6, 7, 8, 9, 1, 2, 3, 4, 10]

query TT
explain select array_agg(c1 order by c2 desc, c3) from agg_order;
----
logical_plan
Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]]
--TableScan: agg_order projection=[c1, c2, c3]
physical_plan
AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AggregateExec mode is final, so run merge_batch

--CoalescePartitionsExec
----AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(agg_order.c1)]
------SortExec: expr=[c2@1 DESC,c3@2 ASC NULLS LAST]
--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2, c3], has_header=true

statement error This feature is not implemented: LIMIT not supported in ARRAY_AGG: 1
SELECT array_agg(c13 LIMIT 1) FROM aggregate_test_100

Expand Down