Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement pruning on partition columns #1179

Merged
merged 11 commits into from
Mar 10, 2023

Conversation

Blajda
Copy link
Collaborator

@Blajda Blajda commented Feb 26, 2023

Description

Exposes partition columns in Datafusion's PruningStatistics which will reduce the number of files scanned when the table is queried.

This also resolves another partition issues where involving null partitions. Previously ScalarValue::Null was used which would cause an error when the actual datatype was obtained from the physical parquet files.

Related Issue(s)

@github-actions github-actions bot added binding/rust Issues for the Rust crate rust labels Feb 26, 2023
@Blajda Blajda marked this pull request as ready for review February 26, 2023 06:21
@Blajda
Copy link
Collaborator Author

Blajda commented Feb 26, 2023

Looks like the windows build is flaking on the main branch. Not sure of the cause. If possible would be nice to rerun just that step.

Copy link
Collaborator

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for writing this @Blajda.

Would you be willing to add some more tests? I think it's worth making sure this is very robust :)

@@ -510,6 +529,59 @@ impl ExecutionPlan for DeltaScan {
}
}

fn get_null_of_arrow_type(t: &ArrowDataType) -> ScalarValue {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably goes beyond the scope of this PR, but it would be nice to consolidate these implementations with the one here:

fn json_value_to_array_general<'a>(
datatype: &arrow::datatypes::DataType,
values: impl Iterator<Item = &'a serde_json::Value>,
) -> Result<ArrayRef, DeltaTableError> {

In some places we seem to be going from serde_json::Value to ScalarValue to ArrayRef, which seems a little inefficient.

let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert!(metrics.num_scanned_files() == 1);

// Check pruning for null partitions. Since there are no record count statistics pruning cannot be done
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems odd. Why can't it eliminate the null partition here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Datafusion, null indicates that statistics for a particular column do not exist. Also null counts are only used for the functions is_null and is_not_null. Further work is required to accommodate the edge case where all records for a columns are actually null. I imagine this can be accomplished by checking the null count statistic with a record count statistic.
I'd take a further look and raise an issue on their end.


let e = col("c3").eq(lit(0));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert!(metrics.num_scanned_files() == 0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also test a conjunction and disjunction?

rust/tests/datafusion_test.rs Show resolved Hide resolved
@@ -228,8 +228,31 @@ async fn test_datafusion_stats() -> Result<()> {
Ok(())
}

async fn get_scan_metrics(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not strictly required in this PR, but we have a very basic implementation for a metrics collector, that I plan to promote to a first class delta-rs member soon. The aim would be to harmonise our approach to metrics collection across operations. Just in case you also want to use that here :)

pub struct ExecutionMetricsCollector {
scanned_files: HashSet<Label>,
}
impl ExecutionMetricsCollector {
fn num_scanned_files(&self) -> usize {
self.scanned_files.len()
}
}
impl ExecutionPlanVisitor for ExecutionMetricsCollector {
type Error = DataFusionError;
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> std::result::Result<bool, Self::Error> {
if let Some(exec) = plan.as_any().downcast_ref::<ParquetExec>() {
let files = get_scanned_files(exec);
self.scanned_files.extend(files);
}
Ok(true)
}
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm already using this implementation with a minor change if there are zero records. I found it really useful!

@Blajda
Copy link
Collaborator Author

Blajda commented Mar 5, 2023

@wjones127
Tests were updated to check all primitive types

There seems to be incomplete stats for date and binary since they do not have min and max stats written.

I think decimal stats are written in correctly. Currently the underlying i128 value is written were I was expecting the actual decimal string representation to be written.

Currently the following types cannot be used as partition column

Type Error
float64 Unimplemented data type: Float64', rust/src/writer/utils.rs:148:13
float32 Unimplemented data type: Float32', rust/src/writer/utils.rs:148:13
timestamp Unimplemented data type: Timestamp(Microsecond, None)
decimal Unimplemented data type: Decimal128(10, 2)', rust/src/writer/utils.rs:148:13
date Unimplemented data type: Date32', rust/src/writer/utils.rs:148:13
binary called Result::unwrap() on an Err value: Arrow { source: InvalidArgumentError("The data type type Binary has no natural order") }', rust/src/operations/write.rs:318:30

Copy link
Collaborator

@roeap roeap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this great work, especially the tests. Happy to see that some of the bigger holes in our datafusion story are being fixed 😀.

| ArrowDataType::Interval(_)
| ArrowDataType::RunEndEncoded(_, _)
| ArrowDataType::Map(_, _) => {
panic!("{}", format!("Implement data type for Delta Lake {}", t));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually try to be a panic-free crate, would it be possible to surface this error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unfortunately mostly used at an interface boundary with Datafusion that doesn't return a result. I removed the panic and restored the original behavior of returning a null scalar when it cannot be handled.

@Blajda
Copy link
Collaborator Author

Blajda commented Mar 5, 2023

If there are no additional fixes required I would like to wait for #1180 to be merged first so I can check if the partition tests are resolved.

@wjones127 wjones127 self-requested a review March 6, 2023 04:38
Copy link
Collaborator

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding all these tests :) Created #1209 and #1208 as follow ups.

Just one change I'd like to see to make the test a little more readable.

Comment on lines 434 to 471
// (Column name, value from file 1, value from file 2, value from file 3, non existant value)
let tests = [
("utf8", lit("1"), lit("5"), lit("8"), lit("3")),
(
"int64",
lit(1 as i64),
lit(5 as i64),
lit(8 as i64),
lit(3 as i64),
),
(
"int32",
lit(1 as i32),
lit(5 as i32),
lit(8 as i32),
lit(3 as i32),
),
(
"int16",
lit(1 as i16),
lit(5 as i16),
lit(8 as i16),
lit(3 as i16),
),
(
"int8",
lit(1 as i8),
lit(5 as i8),
lit(8 as i8),
lit(3 as i8),
),
(
"float64",
lit(1 as f64),
lit(5 as f64),
lit(8 as f64),
lit(3 as f64),
),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps something like, so we aren't referencing everything by position:

Suggested change
// (Column name, value from file 1, value from file 2, value from file 3, non existant value)
let tests = [
("utf8", lit("1"), lit("5"), lit("8"), lit("3")),
(
"int64",
lit(1 as i64),
lit(5 as i64),
lit(8 as i64),
lit(3 as i64),
),
(
"int32",
lit(1 as i32),
lit(5 as i32),
lit(8 as i32),
lit(3 as i32),
),
(
"int16",
lit(1 as i16),
lit(5 as i16),
lit(8 as i16),
lit(3 as i16),
),
(
"int8",
lit(1 as i8),
lit(5 as i8),
lit(8 as i8),
lit(3 as i8),
),
(
"float64",
lit(1 as f64),
lit(5 as f64),
lit(8 as f64),
lit(3 as f64),
),
struct TestCase {
column: &'static str,
file1_value: Expr,
file2_value: Expr,
file3_value: Expr,
non_existent_value: Expr,
}
impl TestCase {
fn new(column: &str, expression_builder: Fn(i64) -> Expr)
{
TestCase {
column,
file1_value: expression_builder(1),
file2_value: expression_builder(5),
file3_value: expression_builder(8),
non_existent_value: expression_builder(3),
}
}
}
let tests = [
TestCase::new("utf8", |value| lit(value.to_string())),
TestCase::new("int64", |value| lit(value)),
TestCase::new("int32", |value| lit(value as i32)),
TestCase::new("int16", |value| lit(value as i16)),
TestCase::new("int8", |value| lit(value as i8)),
TestCase::new("float64", |value| lit(value as f64)),
TestCase::new("float32", |value| lit(value as f32)),
// TODO: I think decimal statistics are being written to the log incorrectly. The underlying i128 is written
// not the proper string representation as specified by the percision and scale
TestCase::new("float32", |value| lit(Decimal128(Some(100 * value), 10, 2))),
TestCase::new("timestamp", |value| lit(ScalarValue::TimestampMicrosecond(Some(value * 1_000_000), None))),
// TODO: The writer does not write complete statistiics for date columns
TestCase::new("date", |value| lit(ScalarValue::Date32(Some(value)))),
// TODO: The writer does not write complete statistics for binary columns
TestCase::new("binary", |value| lit(to_binary(value.to_string()))),
];

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to replace the lines below too. The GH UI doesn't make it easy to select all of them :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wjones127 I've update the cases based on your outline. I agree it makes it a lot easier to read. Thanks for the suggestion :)

@Blajda
Copy link
Collaborator Author

Blajda commented Mar 10, 2023

Rebased on main to retest on #1180. There are still some issues with time type columns. See #1215.
I also omitted pruning for binary columns. Datafusion cannot prune this type yet since a natural order does not exist. See #1214

Copy link
Collaborator

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks excellent. Thanks for helping identify all those cases we need to fix. 😄

@wjones127 wjones127 merged commit 89742b2 into delta-io:main Mar 10, 2023
chitralverma pushed a commit to chitralverma/delta-rs that referenced this pull request Mar 17, 2023
# Description
Exposes partition columns in Datafusion's `PruningStatistics` which will
reduce the number of files scanned when the table is queried.

This also resolves another partition issues where involving `null`
partitions. Previously `ScalarValue::Null` was used which would cause an
error when the actual datatype was obtained from the physical parquet
files.

# Related Issue(s)
- closes delta-io#1175
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate rust
Projects
None yet
Development

Successfully merging this pull request may close these issues.

File pruning does not occur on partition columns
3 participants