Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compute_record_batch_statistics wrong with projection #8489

Merged
merged 40 commits into from
Dec 16, 2023
Merged
Changes from 38 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
7afeb8b
Minor: Improve the document format of JoinHashMap
Asura7969 Nov 8, 2023
6332bec
Merge remote-tracking branch 'origin/main'
Asura7969 Nov 10, 2023
cc5e0c7
Merge remote-tracking branch 'origin/main'
Asura7969 Nov 10, 2023
a114310
Merge remote-tracking branch 'origin/main'
Asura7969 Nov 11, 2023
928c811
Merge remote-tracking branch 'origin/main'
Asura7969 Nov 11, 2023
839093e
Merge remote-tracking branch 'origin/main'
Asura7969 Nov 12, 2023
a836cde
Merge remote-tracking branch 'origin/main'
Asura7969 Nov 13, 2023
5648dc7
Merge branch 'apache:main' into main
Asura7969 Nov 13, 2023
a670409
Merge branch 'apache:main' into main
Asura7969 Nov 14, 2023
22894a3
Merge branch 'apache:main' into main
Asura7969 Nov 14, 2023
73a59d2
Merge branch 'apache:main' into main
Asura7969 Nov 15, 2023
46409c2
Merge branch 'apache:main' into main
Asura7969 Nov 16, 2023
8a86a4c
Merge branch 'apache:main' into main
Asura7969 Nov 17, 2023
cf5c584
Merge branch 'apache:main' into main
Asura7969 Nov 17, 2023
62ae9b9
Merge branch 'apache:main' into main
Asura7969 Nov 19, 2023
da02fa2
Merge branch 'apache:main' into main
Asura7969 Nov 20, 2023
d98eb2e
Merge branch 'apache:main' into main
Asura7969 Nov 21, 2023
79e7216
Merge branch 'apache:main' into main
Asura7969 Nov 21, 2023
ba51abd
Merge branch 'apache:main' into main
Asura7969 Nov 23, 2023
2468f52
Merge branch 'apache:main' into main
Asura7969 Nov 23, 2023
180c303
Merge branch 'apache:main' into main
Asura7969 Nov 24, 2023
68980ba
Merge branch 'apache:main' into main
Asura7969 Nov 27, 2023
9411940
Merge branch 'apache:main' into main
Asura7969 Nov 27, 2023
ba28346
Merge branch 'apache:main' into main
Asura7969 Nov 28, 2023
df0942f
Merge branch 'apache:main' into main
Asura7969 Nov 29, 2023
edccb66
Merge branch 'apache:main' into main
Asura7969 Nov 29, 2023
fb74b99
Merge branch 'apache:main' into main
Asura7969 Nov 30, 2023
767b004
Merge branch 'apache:main' into main
Asura7969 Dec 1, 2023
2e0eef5
Merge branch 'apache:main' into main
Asura7969 Dec 2, 2023
749e0c8
Merge branch 'apache:main' into main
Asura7969 Dec 3, 2023
5d43a94
Merge branch 'apache:main' into main
Asura7969 Dec 5, 2023
71047f3
Merge branch 'apache:main' into main
Asura7969 Dec 6, 2023
4b6921b
Merge branch 'apache:main' into main
Asura7969 Dec 7, 2023
deefdd0
fix `compute_record_batch_statistics` wrong with `projection`
Asura7969 Dec 7, 2023
c00027e
Merge branch 'apache:main' into main
Asura7969 Dec 7, 2023
d46a9f9
Merge branch 'apache:main' into main
Asura7969 Dec 8, 2023
41a520f
Merge branch 'apache:main' into main
Asura7969 Dec 11, 2023
632b460
Merge branch 'main' into fix_total_byte_size
Asura7969 Dec 11, 2023
d19294f
fix test
Asura7969 Dec 11, 2023
928cbb1
fix test
Asura7969 Dec 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions datafusion/physical-plan/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{ColumnStatistics, ExecutionPlan, Statistics};
use arrow::datatypes::Schema;
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
use arrow::record_batch::RecordBatch;
use arrow_array::Array;
use datafusion_common::stats::Precision;
use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_execution::memory_pool::MemoryReservation;
Expand Down Expand Up @@ -139,17 +140,24 @@ pub fn compute_record_batch_statistics(
) -> Statistics {
let nb_rows = batches.iter().flatten().map(RecordBatch::num_rows).sum();

let total_byte_size = batches
.iter()
.flatten()
.map(|b| b.get_array_memory_size())
.sum();

let projection = match projection {
Some(p) => p,
None => (0..schema.fields().len()).collect(),
};

let total_byte_size = batches
.iter()
.flatten()
.map(|b| {
b.columns()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RecordBatch.project method is not used, because there is a clone internally, so there is no need to generate a new RecordBatch here.

.iter()
.enumerate()
.filter(|(index, _)| projection.contains(index))
Dandandan marked this conversation as resolved.
Show resolved Hide resolved
.map(|(_, col)| col.get_array_memory_size())
.sum::<usize>()
})
.sum();

let mut column_statistics = vec![ColumnStatistics::new_unknown(); projection.len()];

for partition in batches.iter() {
Expand Down Expand Up @@ -388,6 +396,7 @@ mod tests {
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use arrow_array::UInt64Array;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{col, Column};

Expand Down Expand Up @@ -685,20 +694,30 @@ mod tests {
let schema = Arc::new(Schema::new(vec![
Field::new("f32", DataType::Float32, false),
Field::new("f64", DataType::Float64, false),
Field::new("u64", DataType::UInt64, false),
]));
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Float32Array::from(vec![1., 2., 3.])),
Arc::new(Float64Array::from(vec![9., 8., 7.])),
Arc::new(UInt64Array::from(vec![4, 5, 6])),
],
)?;

// just select f32,f64
let select_projection = Some(vec![0, 1]);
let byte_size = batch
.project(&select_projection.clone().unwrap())
.unwrap()
.get_array_memory_size();

let actual =
compute_record_batch_statistics(&[vec![batch]], &schema, Some(vec![0, 1]));
compute_record_batch_statistics(&[vec![batch]], &schema, select_projection);

let mut expected = Statistics {
let expected = Statistics {
num_rows: Precision::Exact(3),
total_byte_size: Precision::Exact(464), // this might change a bit if the way we compute the size changes
total_byte_size: Precision::Exact(byte_size),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is appropriate, if you have any good suggestions please leave a message

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok and a nice way to make the code less brittle to future changes in arrow's layout

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious as to why the previous code was Precision::Exact(464).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it happens to be the (current) size of the record batch in the test:

        let batch = RecordBatch::try_new(
            Arc::clone(&schema),
            vec![
                Arc::new(Float32Array::from(vec![1., 2., 3.])),
                Arc::new(Float64Array::from(vec![9., 8., 7.])),
                Arc::new(UInt64Array::from(vec![4, 5, 6])),
            ],
        )?;

column_statistics: vec![
ColumnStatistics {
distinct_count: Precision::Absent,
Expand All @@ -715,9 +734,6 @@ mod tests {
],
};

// Prevent test flakiness due to undefined / changing implementation details
expected.total_byte_size = actual.total_byte_size.clone();

assert_eq!(actual, expected);
Ok(())
}
Expand Down
Loading