Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: delete operation #1176

Merged
merged 16 commits into from
May 8, 2023
Merged

feat: delete operation #1176

merged 16 commits into from
May 8, 2023

Conversation

Blajda
Copy link
Collaborator

@Blajda Blajda commented Feb 24, 2023

Description

This is a full implementation of the Delta Delete Command. Users can now delete records that match a predicate. This implementation is not limited to only partition columns and allows non-partition columns.

This also implements a find_files function which can be used to implement the Update command.

Related Issue(s)

Documentation

@github-actions github-actions bot added binding/rust Issues for the Rust crate rust labels Feb 24, 2023
@Blajda Blajda changed the title :WIP: Delete Operation feat: [WIP] Delete Operation Apr 24, 2023
@Blajda
Copy link
Collaborator Author

Blajda commented Apr 24, 2023

I want to continue to push this forward to have a MVP for deletion. My biggest issue with the current implementation is the find_files function since it has iterate through each file in series to determine if it contains a record.
If Datafusion supported a method like input_file_name then the multiple scans can be performed in parallel and would speed up the operation.

Requires further changes when #1303 is merged.
Currently there is an optimization for when the columns in the expression are only partitions columns. I may disable it until all pruning issues are resolved.

Looking forward to the great feedback as always. 😄

@Blajda Blajda marked this pull request as ready for review April 24, 2023 03:19
@Blajda Blajda changed the title feat: [WIP] Delete Operation feat: Delete Operation Apr 24, 2023
Copy link
Collaborator

@roeap roeap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great work @Blajda!

Left some comments mostly around some Option treatment. Also had a thought on how we can achieve the parallelisim you mentioned when scanning files, but not sure yet if that will actually work :)

rust/src/operations/delete.rs Show resolved Hide resolved
rust/src/operations/delete.rs Outdated Show resolved Hide resolved
rust/src/operations/delete.rs Outdated Show resolved Hide resolved
Comment on lines 597 to 598
let mut table = DeltaTable::new_with_state(this.store, this.snapshot);
table.update().await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we can probably avoid some io by returning the actions and version from execute and updating the new state directly. Essentially something like.

this.snapshot.merge(DeltaTableState::from_actions(actions, version)?, true, true);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented this and I imagine we would want to use this pattern for other operations. Should we return the version and actions in an optional? Currently I return an empty list + the current version if there are no changes.

rust/src/operations/delete.rs Outdated Show resolved Hide resolved
&execution_props,
)?;
let filter = Arc::new(FilterExec::try_new(predicate_expr, parquet_scan)?);
let limit: Arc<dyn ExecutionPlan> = Arc::new(GlobalLimitExec::new(filter, 0, Some(1)));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't fully thought this through, but maybe one way for us to exploit parallelism may be to create the ParquetScan in a way that each file group contains exactly one file and use LocalLimitExec. Looked a little bit throught the df code, and I think the order would be preserved, so we can infer from the partition number the file that was read.

If that cannot work, and we have to add the file name as a column, that should be feasible by treating it as as "virtual partition coluimn". However this may required some more updates and special caeses how we create the schema / statistics in various places.. Including the file name as partition value in PartitionedFile is straight forward though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. The primary insight that I missed was adding the file path as a partition column. I'll give this a try and report any issues :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to implement the suggestions. Let me know if you have any suggestions for the new implementation.

Copy link
Collaborator

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a closer look tomorrow, but provided some initial comments. Thanks for working on this!

// rewrite phases.
match expr {
Expr::ScalarVariable(_, _) | Expr::Literal(_) => (),
Expr::Alias(expr, _) => validate_expr(expr, partition_columns, properties)?,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can avoid recursion using the TreeNode trait methods on Expr?
https://docs.rs/datafusion/23.0.0/datafusion/prelude/enum.Expr.html#impl-TreeNode-for-Expr

Would help avoid stack overflows if there is a very nested expression. I could imagine that happening if someone passes a filter like:

x = 1
OR x = 2
OR x = 3
...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Datafusion implementation still uses recursion but overflow issues are now on them. The visitor pattern also makes the code look cleaner too!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Datafusion implementation still uses recursion but overflow issues are now on them

🤣

rust/src/operations/delete.rs Outdated Show resolved Hide resolved
rust/src/operations/delete.rs Show resolved Hide resolved
rust/src/operations/delete.rs Show resolved Hide resolved
@github-actions
Copy link

ACTION NEEDED

delta-rs follows the Conventional Commits specification for release automation.

The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification.

@Blajda Blajda changed the title feat: Delete Operation feat: delete operation Apr 30, 2023
Copy link
Collaborator

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. I have a few more suggestions.

rust/src/operations/delete.rs Outdated Show resolved Hide resolved
rust/src/operations/delete.rs Show resolved Hide resolved
rust/src/operations/delete.rs Outdated Show resolved Hide resolved

#[tokio::test]
async fn test_delete_on_nonpartiton_column() {
// Delete based on a nonpartition column
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are null values handled? For example, if I have a column x: [1, 2, null, 4], does DELETE FROM table_name WHERE x > 2 delete just the last row, or also the third row? Seems like it's worth a unit test to verify.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah great call out.
The previous commit would delete the row. I've added a new test and changed the behavior to not delete the third record in this case.

A record should only be deleted if the predicate evaluates to true other it is kept. null > 2 evaluates to UNKNOWN.
I've checked the spark implementation and the behavior aligns

}

// Create a record batch that contains the partition columns plus the path of the file
fn create_partition_record_batch(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can re-use this function for now:

pub fn add_actions_table(

It handles the different data types for partition values. It has a little more overhead since it also parses out the statistics, but I think that's fine for now. Later on, I expect we'll replace this with expression simplification which will let us use statistics and remove redundant parts of the predicate:

apache/datafusion#6171

@Blajda
Copy link
Collaborator Author

Blajda commented May 6, 2023

Hi @wjones127 @roeap
I've made the requested changes. Let me know if it is good to merge or any additional concerns.

roeap
roeap previously approved these changes May 6, 2023
Copy link
Collaborator

@roeap roeap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my end this look great - thanks for this excellent contribution @Blajda.

I'll leave it open though for @wjones127 to chine in.

@wjones127 wjones127 merged commit 0115fbb into delta-io:main May 8, 2023
@Blajda Blajda deleted the delete-op branch May 10, 2023 03:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate rust
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement simple delete case
3 participants