-
Notifications
You must be signed in to change notification settings - Fork 416
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement merge command #850
Comments
From MergeIntoCommand.scala it looks like a really complex operation. I may be able to work on this. It would be great if you could help me estimate how much effort this would be. Thanks for any input! |
Merge is indeed a complex operation. We started with just append/overwrite because that doesn't require a query engine; you can just pass some data and write to parquet. But for things like delete and merge, we'll have to integrate a query engine and/or allow plugging one in. (Since we need a way to query for which rows match.) We're in the early stages of designing that, so it might be a couple months before we are even ready to start implementing merge. That being said, we'd definitely like to support this and would welcome contributions 😄 . Are you planning on building your integration on top of the Rust library or the Python library? And for your use case are you more interested in having the query engine parts provided for you (probably DataFusion) or being able to pass in your own implementation? |
Thanks! We would be using the Python interface and already have a Dask system on hand. It would be great if we could use that! |
I'm prototyping this and found that DataFusion does not have an equivalent to input_file_name in Spark. This built-in UDF is used in the delta-spark implementation to help filter out unmatched files in all of UPDATE/DELETE/MERGE commands (without this, the entire table would be rewritten every time). It looks like there were a few starts years ago on this work, but it didn't make it through the repo split. The most current issue I'm able to find is here: apache/arrow#18601, and the previous two efforts apache/arrow#9944 and apache/arrow#9976. I think ultimately this built-in UDF should exist in DataFusion, but wondering if anyone with more familiarity with DataFusion has any ideas on how we might accomplish this in the interim? I'm just familiarizing myself with DataFusion so, so far, the only thing that comes to mind is a pretty horrendous "read each file as a DF + project the file name, and then union all the dataframes". |
Perhaps a decent first pass is to have a query per file? Something vaguely like (for a delete query): let file_stream = futures::stream::iter(delta_table.files_uris());
let action_stream = file_stream.map(|file| async {
let orig_num_rows = parquet_row_count(file).await?;
let df = ctx.read_parquet(file);
let res = df.filter(!delete_clause).collect().await?;
if res.num_rows() == 0 {
(RemoveAction { file }, None)
} else if res.num_rows() == orig_num_rows {
(None, None)
} else {
let new_file = generate_new_file_name();
write_parquet(res, new_fiel).await?;
(RemoveAction { file }, AddAction { new_file })
}
});
let actions = action_stream.collect_vec()?; |
I think that works great for UPDATE/DELETE, but don't see how it would work for MERGE in the eventual distributed (ballista) case (maybe that's too much to think about for now). We need the origin of the row at runtime of the join (or post), unlike the static predicates in UPDATE/DELETE that we can evaluate one file at a time. Your example is actually close to how I envisioned the messy way of supporting this in merge: by projecting Anyway, in a single node version, I suppose we could could just perform the entire MERGE one file at a time, same as your DELETE example. |
That's fair. I do think we should solve UPDATE/DELETE first. That will get a lot of useful stuff out of the way.
I think that's a decent first pass to protoype. Could you file a new issue in https://github.com/apache/arrow-datafusion? That's where the upstream issue should live now. |
I have a functional prototype of merge done and would like to start a discussion on the public rust interface. Currently I have something like this that follows the same pattern as the other operations. let (table, _metrics) = DeltaOps(table)
.merge(source, col("id").eq(col("id_src")))
.when_matched_update()
.with_predicate("value >= 20")
.with_update("value", col("value_src"))
.with_update("modified", col("modified_src"))
.build()
.when_matched_delete()
.build()
.when_not_matched_insert()
.with_set("id", col("id_src"))
.with_set("value", col("value_src"))
.with_set("modified", col("modified_src"))
.build()
.await?; I don't like this approach since cargo format forces each child builder to be vertically aligned with the parent. One possible approach is to have a closure that accepts the child builder. It would looks something like this. let (table, _metrics) = DeltaOps(table)
.merge(source, col("id").eq(col("id_src")))
.when_matched_update(|update| {
update
.predicate("value >= 20")
.update("value", col("value_src"))
.update("modified", col("modified_src"))
})
.when_matched_delete(|delete| delete)
.when_not_matched_insert(|insert| {
insert
.set("id", col("id_src"))
.set("value", col("value_src"))
.set("modified", col("modified_src"))
})
.await?; |
@Blajda I agree that one looks nice. looks closer to the API in PySpark, which I assume is what you are modeling off of. |
I like the proposed api as well. I guess it would also help us to not keep two apis for the child builder in sync? |
this is great @Blajda |
@roeap I don't quite follow what you mean by this. Seems like the consensus is on the second proposal and I intended to implement only one. Is your suggestion to do both? |
Absolutely not. Yes, the second one seems to be the winner :). My understanding was, that the closure for I may also just have misunderstood things. In any case the effort would be manageable :). |
# Description Implement the Merge operation using Datafusion. Currently the implementation rewrites the entire DeltaTable limiting the files that are rewritten will be performed in future work. # Related Issue(s) - progresses #850 # Documentation <!--- Share links to useful documentation ---> --------- Co-authored-by: Will Jones <willjones127@gmail.com> Co-authored-by: Robert Pack <42610831+roeap@users.noreply.github.com>
@darabos @wjones127 @Blajda should this issue be closed and a new one created for an improved version of MERGE where it's not a full rewrite, but partial? |
I think we should keep this open so we have single issue to track progress. We should should only close this issue once merge does not perform a full rewrite and the join operator uses a HashJoin or SortMergeJoin. |
# Description This refactors the merge operation to use DataFusion's DataFrame and LogicalPlan APIs The NLJ is eliminated and the query planner can pick the optimal join operator. This also enables the operation to use multiple threads and should result in significant speed up. Merge is still limited to using a single thread in some area. When collecting benchmarks, I encountered multiple OoM issues with Datafusion's hash join implementation. There are multiple tickets upstream open regarding this. For now, I've limited the number of partitions to just 1 to prevent this. Predicates passed as SQL are also easier to use now. Manual casting was required to ensure data types were aligned. Now the logical plan will perform type coercion when optimizing the plan. # Related Issues - enhances #850 - closes #1790 - closes #1753
# Description This refactors the merge operation to use DataFusion's DataFrame and LogicalPlan APIs The NLJ is eliminated and the query planner can pick the optimal join operator. This also enables the operation to use multiple threads and should result in significant speed up. Merge is still limited to using a single thread in some area. When collecting benchmarks, I encountered multiple OoM issues with Datafusion's hash join implementation. There are multiple tickets upstream open regarding this. For now, I've limited the number of partitions to just 1 to prevent this. Predicates passed as SQL are also easier to use now. Manual casting was required to ensure data types were aligned. Now the logical plan will perform type coercion when optimizing the plan. # Related Issues - enhances delta-io#850 - closes delta-io#1790 - closes delta-io#1753
# Description This refactors the merge operation to use DataFusion's DataFrame and LogicalPlan APIs The NLJ is eliminated and the query planner can pick the optimal join operator. This also enables the operation to use multiple threads and should result in significant speed up. Merge is still limited to using a single thread in some area. When collecting benchmarks, I encountered multiple OoM issues with Datafusion's hash join implementation. There are multiple tickets upstream open regarding this. For now, I've limited the number of partitions to just 1 to prevent this. Predicates passed as SQL are also easier to use now. Manual casting was required to ensure data types were aligned. Now the logical plan will perform type coercion when optimizing the plan. # Related Issues - enhances delta-io#850 - closes delta-io#1790 - closes delta-io#1753
# Description This refactors the merge operation to use DataFusion's DataFrame and LogicalPlan APIs The NLJ is eliminated and the query planner can pick the optimal join operator. This also enables the operation to use multiple threads and should result in significant speed up. Merge is still limited to using a single thread in some area. When collecting benchmarks, I encountered multiple OoM issues with Datafusion's hash join implementation. There are multiple tickets upstream open regarding this. For now, I've limited the number of partitions to just 1 to prevent this. Predicates passed as SQL are also easier to use now. Manual casting was required to ensure data types were aligned. Now the logical plan will perform type coercion when optimizing the plan. # Related Issues - enhances delta-io#850 - closes delta-io#1790 - closes delta-io#1753
# Description Implements a new Datafusion node called `MergeBarrier` that determines which files have modifications. For files that do not have modifications a remove action is no longer created. # Related Issue(s) - enhances #850
# Description Update documentation to reflect the current state of merge. Since merge now supports upserts without performing a full rewrite I'd let to mark it as "done". There is of course further optimizations that can be performed but it is now in a usable state. # Related Issue(s) - closes #850
# Description Update documentation to reflect the current state of merge. Since merge now supports upserts without performing a full rewrite I'd let to mark it as "done". There is of course further optimizations that can be performed but it is now in a usable state. # Related Issue(s) - closes delta-io#850
If I understand Delta's docs (https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge) correctly, merge is different from a generic write. Rather than appending to or replacing an existing table, it updates (rewrites) the affected target files. Looks like it is also implemented separately. (MergeIntoCommand.scala)
Use Case
Writing a new table to Delta is not so exciting for me. I could just write a Parquet table. But updating an existing table has no substitute. I'm working on integrating a non-Spark computation system with Delta, and the merge command looks like the best way to put back results into the original table.
Related Issue(s)
#542 added basic writing support.
The text was updated successfully, but these errors were encountered: