Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rust merge error - datafusion panics #1790

Closed
PiotrGierda opened this issue Oct 31, 2023 · 4 comments · Fixed by #1720
Closed

rust merge error - datafusion panics #1790

PiotrGierda opened this issue Oct 31, 2023 · 4 comments · Fixed by #1720
Labels
binding/rust Issues for the Rust crate bug Something isn't working

Comments

@PiotrGierda
Copy link

Environment

Delta-rs version:
0.16.2
Binding:
rust


Bug

What happened:
Code panics when trying to merge datafusion's DataFrame into the DeltaTable (on MinIO S3).
Could it be connected to #1699?

What you expected to happen:
Not panic, originally I had INSERT clauses as well, but removed them for example simplification's case as the error was exactly the same.

How to reproduce it:
I've made a minimal example here: https://github.com/PiotrGierda/minimal_example_deltalake_datafusion
Need to provide valid MinIO credentials and details as ENVVARs when running with cargo run
https://github.com/PiotrGierda/minimal_example_deltalake_datafusion/blob/main/src/main.rs#L99C5-L102C25:

let (_table, _metrics) = DeltaOps(target_table)
        .merge(source_df, col("__id").eq(col("source.__id")))
        .with_source_alias("source")
        .await.unwrap();

results in:

     Running `target/debug/minimal_example`
+--------+-------------+-------------+
| __id   | __createdat | __updatedat |
+--------+-------------+-------------+
| csvboi | 1698670149  | 1698670149  |
+--------+-------------+-------------+

thread 'main' panicked at 'called Result::unwrap() on an Err value: Generic("Internal error: CoalescePartitionsExec requires at least one input partition\n\nbacktrace: 0: datafusion_common::error::DataFusionError::get_back_trace\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/datafusion-common-31.0.0/src/error.rs:423:26\n 1: <datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec as datafusion::physical_plan::ExecutionPlan>::execute\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/datafusion-31.0.0/src/physical_plan/coalesce_partitions.rs:128:18\n 2: <datafusion::physical_plan::joins::nested_loop_join::NestedLoopJoinExec as datafusion::physical_plan::ExecutionPlan>::execute\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/datafusion-31.0.0/src/physical_plan/joins/nested_loop_join.rs:250:31\n 3: <datafusion::physical_plan::projection::ProjectionExec as datafusion::physical_plan::ExecutionPlan>::execute\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/datafusion-31.0.0/src/physical_plan/projection.rs:326:20\n 4: <datafusion::physical_plan::projection::ProjectionExec as datafusion::physical_plan::ExecutionPlan>::execute\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/datafusion-31.0.0/src/physical_plan/projection.rs:326:20\n 5: <deltalake::operations::datafusion_utils::MetricObserverExec as datafusion::physical_plan::ExecutionPlan>::execute\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/deltalake-0.16.2/src/operations/mod.rs:324:23\n 6: <datafusion::physical_plan::filter::FilterExec as datafusion::physical_plan::ExecutionPlan>::execute\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/datafusion-31.0.0/src/physical_plan/filter.rs:179:20\n 7: <datafusion::physical_plan::projection::ProjectionExec as datafusion::physical_plan::ExecutionPlan>::execute\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/datafusion-31.0.0/src/physical_plan/projection.rs:326:20\n 8: deltalake::operations::write::write_execution_plan::{{closure}}\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/deltalake-0.16.2/src/operations/write.rs:279:26\n 9: deltalake::operations::merge::execute::{{closure}}\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/deltalake-0.16.2/src/operations/merge.rs:990:6\n 10: <deltalake::operations::merge::MergeBuilder as core::future::into_future::IntoFuture>::into_future::{{closure}}\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/deltalake-0.16.2/src/operations/merge.rs:1087:14\n 11: <core::pin::Pin

as core::future::future::Future>::poll\n at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/future/future.rs:125:9\n 12: minimal_example::main::{{closure}}\n at ./src/main.rs:102:10\n 13: tokio::runtime::park::CachedParkThread::block_on::{{closure}}\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/park.rs:282:63\n 14: tokio::runtime::coop::with_budget\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/coop.rs:107:5\n 15: tokio::runtime::coop::budget\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/coop.rs:73:5\n 16: tokio::runtime::park::CachedParkThread::block_on\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/park.rs:282:31\n 17: tokio::runtime::context::blocking::BlockingRegionGuard::block_on\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/context/blocking.rs:66:9\n 18: tokio::runtime::scheduler::multi_thread::MultiThread::block_on::{{closure}}\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/scheduler/multi_thread/mod.rs:87:13\n 19: tokio::runtime::context::runtime::enter_runtime\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/context/runtime.rs:65:16\n 20: tokio::runtime::scheduler::multi_thread::MultiThread::block_on\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/scheduler/multi_thread/mod.rs:86:9\n 21: tokio::runtime::runtime::Runtime::block_on\n at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/runtime.rs:350:45\n 22: minimal_example::main\n at ./src/main.rs:99:5\n 23: core::ops::function::FnOnce::call_once\n at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/ops/function.rs:250:5\n 24: std::sys_common::backtrace::__rust_begin_short_backtrace\n at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/sys_common/backtrace.rs:135:18\n 25: std::rt::lang_start::{{closure}}\n at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/rt.rs:166:18\n 26: core::ops::function::impls::<impl core::ops::function::FnOnce for &F>::call_once\n at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/ops/function.rs:284:13\n 27: std::panicking::try::do_call\n at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:500:40\n 28: std::panicking::try\n at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:464:19\n 29: std::panic::catch_unwind\n at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panic.rs:142:14\n 30: std::rt::lang_start_internal::{{closure}}\n at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/rt.rs:148:48\n 31: std::panicking::try::do_call\n at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:500:40\n 32: std::panicking::try\n at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:464:19\n 33: std::panic::catch_unwind\n at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panic.rs:142:14\n 34: std::rt::lang_start_internal\n at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/rt.rs:148:20\n 35: std::rt::lang_start\n at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/rt.rs:165:17\n 36: main\n 37: __libc_start_call_main\n at ./csu/../sysdeps/nptl/libc_start_call_main.h:58:16\n 38: __libc_start_main_impl\n at ./csu/../csu/libc-start.c:392:3\n 39: _start\n.\nThis was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker")', src/main.rs:102:16
stack backtrace:
0: rust_begin_unwind
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/panicking.rs:593:5
1: core::panicking::panic_fmt
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/panicking.rs:67:14
2: core::result::unwrap_failed
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/result.rs:1651:5
3: core::result::Result<T,E>::unwrap
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/result.rs:1076:23
4: minimal_example::main::{{closure}}
at ./src/main.rs:99:30
5: tokio::runtime::park::CachedParkThread::block_on::{{closure}}
at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/park.rs:282:63
6: tokio::runtime::coop::with_budget
at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/coop.rs:107:5
7: tokio::runtime::coop::budget
at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/coop.rs:73:5
8: tokio::runtime::park::CachedParkThread::block_on
at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/park.rs:282:31
9: tokio::runtime::context::blocking::BlockingRegionGuard::block_on
at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/context/blocking.rs:66:9
10: tokio::runtime::scheduler::multi_thread::MultiThread::block_on::{{closure}}
at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/scheduler/multi_thread/mod.rs:87:13
11: tokio::runtime::context::runtime::enter_runtime
at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/context/runtime.rs:65:16
12: tokio::runtime::scheduler::multi_thread::MultiThread::block_on
at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/scheduler/multi_thread/mod.rs:86:9
13: tokio::runtime::runtime::Runtime::block_on
at /home/piotr/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/runtime.rs:350:45
14: minimal_example::main
at ./src/main.rs:99:5
15: core::ops::function::FnOnce::call_once
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with RUST_BACKTRACE=full for a verbose backtrace.

More details:
The same happens with INSERT clauses and with some data already in the table.

@PiotrGierda PiotrGierda added the bug Something isn't working label Oct 31, 2023
@rtyler rtyler added the binding/rust Issues for the Rust crate label Oct 31, 2023
@Blajda
Copy link
Collaborator

Blajda commented Nov 2, 2023

@PiotrGierda Thanks for making a minimal example for this.
Are you using merge with a table that contains zero records? I think that's the more likely root cause then minio integration issues.

@PiotrGierda
Copy link
Author

PiotrGierda commented Nov 2, 2023

Yeah, you're right, that was the issue. Thank you!
I thought that I have replicated the issue on a nonempty table as well but I was getting a different error on that one.

Btw. is there a way to make merge into an empty table work or should we check for emptiness before running merge and resort to regular write when the table is empty?

@Blajda
Copy link
Collaborator

Blajda commented Nov 3, 2023

@PiotrGierda MERGE is being refactored to use logical plans which will hopefully resolve this issue #1720
I'll add an additional test case to ensure the operation works with empty tables.

@PiotrGierda
Copy link
Author

Cool, thank you.

Blajda added a commit that referenced this issue Nov 19, 2023
# Description
This refactors the merge operation to use DataFusion's DataFrame and
LogicalPlan APIs

The NLJ is eliminated and the query planner can pick the optimal join
operator. This also enables the operation to use multiple threads and
should result in significant speed up.
Merge is still limited to using a single thread in some area. When
collecting benchmarks, I encountered multiple OoM issues with
Datafusion's hash join implementation. There are multiple tickets
upstream open regarding this. For now, I've limited the number of
partitions to just 1 to prevent this.

Predicates passed as SQL are also easier to use now. Manual casting was
required to ensure data types were aligned. Now the logical plan will
perform type coercion when optimizing the plan.

# Related Issues
- enhances #850
- closes #1790 
- closes #1753
ion-elgreco pushed a commit to ion-elgreco/delta-rs that referenced this issue Nov 20, 2023
# Description
This refactors the merge operation to use DataFusion's DataFrame and
LogicalPlan APIs

The NLJ is eliminated and the query planner can pick the optimal join
operator. This also enables the operation to use multiple threads and
should result in significant speed up.
Merge is still limited to using a single thread in some area. When
collecting benchmarks, I encountered multiple OoM issues with
Datafusion's hash join implementation. There are multiple tickets
upstream open regarding this. For now, I've limited the number of
partitions to just 1 to prevent this.

Predicates passed as SQL are also easier to use now. Manual casting was
required to ensure data types were aligned. Now the logical plan will
perform type coercion when optimizing the plan.

# Related Issues
- enhances delta-io#850
- closes delta-io#1790 
- closes delta-io#1753
ion-elgreco pushed a commit to ion-elgreco/delta-rs that referenced this issue Nov 20, 2023
# Description
This refactors the merge operation to use DataFusion's DataFrame and
LogicalPlan APIs

The NLJ is eliminated and the query planner can pick the optimal join
operator. This also enables the operation to use multiple threads and
should result in significant speed up.
Merge is still limited to using a single thread in some area. When
collecting benchmarks, I encountered multiple OoM issues with
Datafusion's hash join implementation. There are multiple tickets
upstream open regarding this. For now, I've limited the number of
partitions to just 1 to prevent this.

Predicates passed as SQL are also easier to use now. Manual casting was
required to ensure data types were aligned. Now the logical plan will
perform type coercion when optimizing the plan.

# Related Issues
- enhances delta-io#850
- closes delta-io#1790 
- closes delta-io#1753
ion-elgreco pushed a commit to ion-elgreco/delta-rs that referenced this issue Nov 20, 2023
# Description
This refactors the merge operation to use DataFusion's DataFrame and
LogicalPlan APIs

The NLJ is eliminated and the query planner can pick the optimal join
operator. This also enables the operation to use multiple threads and
should result in significant speed up.
Merge is still limited to using a single thread in some area. When
collecting benchmarks, I encountered multiple OoM issues with
Datafusion's hash join implementation. There are multiple tickets
upstream open regarding this. For now, I've limited the number of
partitions to just 1 to prevent this.

Predicates passed as SQL are also easier to use now. Manual casting was
required to ensure data types were aligned. Now the logical plan will
perform type coercion when optimizing the plan.

# Related Issues
- enhances delta-io#850
- closes delta-io#1790 
- closes delta-io#1753
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants