-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Minor: Add multi ordering test for array agg order #8439
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
c1,c2,c3 | ||
1,20,0 | ||
2,20,1 | ||
3,10,2 | ||
4,10,3 | ||
5,30,4 | ||
6,30,5 | ||
7,30,6 | ||
8,30,7 | ||
9,30,8 | ||
10,10,9 | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,9 +30,9 @@ use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr}; | |
|
||
use arrow::array::ArrayRef; | ||
use arrow::datatypes::{DataType, Field}; | ||
use arrow_array::cast::AsArray; | ||
use arrow_array::Array; | ||
use arrow_schema::{Fields, SortOptions}; | ||
use datafusion_common::cast::as_list_array; | ||
use datafusion_common::utils::{compare_rows, get_row_at_idx}; | ||
use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue}; | ||
use datafusion_expr::Accumulator; | ||
|
@@ -214,7 +214,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { | |
// values received from its ordering requirement expression. (This information is necessary for during merging). | ||
let agg_orderings = &states[1]; | ||
|
||
if as_list_array(agg_orderings).is_ok() { | ||
if let Some(agg_orderings) = agg_orderings.as_list_opt::<i32>() { | ||
// Stores ARRAY_AGG results coming from each partition | ||
let mut partition_values = vec![]; | ||
// Stores ordering requirement expression results coming from each partition | ||
|
@@ -232,10 +232,21 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { | |
} | ||
|
||
let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?; | ||
// Ordering requirement expression values for each entry in the ARRAY_AGG list | ||
let other_ordering_values = self.convert_array_agg_to_orderings(orderings)?; | ||
for v in other_ordering_values.into_iter() { | ||
partition_ordering_values.push(v); | ||
|
||
for partition_ordering_rows in orderings.into_iter() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think |
||
// Extract value from struct to ordering_rows for each group/partition | ||
let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| { | ||
if let ScalarValue::Struct(Some(ordering_columns_per_row), _) = ordering_row { | ||
Ok(ordering_columns_per_row) | ||
} else { | ||
exec_err!( | ||
"Expects to receive ScalarValue::Struct(Some(..), _) but got:{:?}", | ||
ordering_row.data_type() | ||
) | ||
} | ||
}).collect::<Result<Vec<_>>>()?; | ||
|
||
partition_ordering_values.push(ordering_value); | ||
} | ||
|
||
let sort_options = self | ||
|
@@ -293,33 +304,10 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { | |
} | ||
|
||
impl OrderSensitiveArrayAggAccumulator { | ||
/// Inner Vec\<ScalarValue> in the ordering_values can be thought as ordering information for the each ScalarValue in the values array. | ||
/// See [`merge_ordered_arrays`] for more information. | ||
fn convert_array_agg_to_orderings( | ||
&self, | ||
array_agg: Vec<Vec<ScalarValue>>, | ||
) -> Result<Vec<Vec<Vec<ScalarValue>>>> { | ||
let mut orderings = vec![]; | ||
// in_data is Vec<ScalarValue> where ScalarValue does not include ScalarValue::List | ||
for in_data in array_agg.into_iter() { | ||
let ordering = in_data.into_iter().map(|struct_vals| { | ||
if let ScalarValue::Struct(Some(orderings), _) = struct_vals { | ||
Ok(orderings) | ||
} else { | ||
exec_err!( | ||
"Expects to receive ScalarValue::Struct(Some(..), _) but got:{:?}", | ||
struct_vals.data_type() | ||
) | ||
} | ||
}).collect::<Result<Vec<_>>>()?; | ||
orderings.push(ordering); | ||
} | ||
Ok(orderings) | ||
} | ||
|
||
fn evaluate_orderings(&self) -> Result<ScalarValue> { | ||
let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]); | ||
let struct_field = Fields::from(fields.clone()); | ||
|
||
let orderings: Vec<ScalarValue> = self | ||
.ordering_values | ||
.iter() | ||
|
@@ -329,6 +317,7 @@ impl OrderSensitiveArrayAggAccumulator { | |
.collect(); | ||
let struct_type = DataType::Struct(Fields::from(fields)); | ||
|
||
// Wrap in List, so we have the same data structure ListArray(StructArray..) for group by cases | ||
let arr = ScalarValue::new_list(&orderings, &struct_type); | ||
Ok(ScalarValue::List(arr)) | ||
This comment was marked as outdated.
Sorry, something went wrong. |
||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -106,6 +106,36 @@ FROM | |
---- | ||
[0VVIHzxWtNOFLtnhjHEKjXaJOSLJfm, 0keZ5G8BffGwgF2RwQD59TFzMStxCB, 0og6hSkhbX8AC1ktFS4kounvTzy8Vo, 1aOcrEGd0cOqZe2I5XBOm0nDcwtBZO, 2T3wSlHdEmASmO0xcXHnndkKEt6bz8] | ||
|
||
statement ok | ||
CREATE EXTERNAL TABLE agg_order ( | ||
c1 INT NOT NULL, | ||
c2 INT NOT NULL, | ||
c3 INT NOT NULL | ||
) | ||
STORED AS CSV | ||
WITH HEADER ROW | ||
LOCATION '../core/tests/data/aggregate_agg_multi_order.csv'; | ||
|
||
# test array_agg with order by multiple columns | ||
query ? | ||
select array_agg(c1 order by c2 desc, c3) from agg_order; | ||
---- | ||
[5, 6, 7, 8, 9, 1, 2, 3, 4, 10] | ||
|
||
query TT | ||
explain select array_agg(c1 order by c2 desc, c3) from agg_order; | ||
---- | ||
logical_plan | ||
Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]] | ||
--TableScan: agg_order projection=[c1, c2, c3] | ||
physical_plan | ||
AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1)] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AggregateExec mode is final, so run |
||
--CoalescePartitionsExec | ||
----AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(agg_order.c1)] | ||
------SortExec: expr=[c2@1 DESC,c3@2 ASC NULLS LAST] | ||
--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | ||
----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2, c3], has_header=true | ||
|
||
statement error This feature is not implemented: LIMIT not supported in ARRAY_AGG: 1 | ||
SELECT array_agg(c13 LIMIT 1) FROM aggregate_test_100 | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why we need csv test is that only csv file trigger
merge_batch
, the normal table go toupdate_batch
only.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normal table
AggregateExec: mode is Single, so run
update_batch
.