Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update datafusion to 42.0 and arrow to 53.2 #3176

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,162 changes: 696 additions & 466 deletions Cargo.lock

Large diffs are not rendered by default.

44 changes: 22 additions & 22 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,17 @@ lance-test-macros = { version = "=0.19.2", path = "./rust/lance-test-macros" }
lance-testing = { version = "=0.19.2", path = "./rust/lance-testing" }
approx = "0.5.1"
# Note that this one does not include pyarrow
arrow = { version = "52.2", optional = false, features = ["prettyprint"] }
arrow-arith = "52.2"
arrow-array = "52.2"
arrow-buffer = "52.2"
arrow-cast = "52.2"
arrow-data = "52.2"
arrow-ipc = { version = "52.2", features = ["zstd"] }
arrow-ord = "52.2"
arrow-row = "52.2"
arrow-schema = "52.2"
arrow-select = "52.2"
arrow = { version = "53.2", optional = false, features = ["prettyprint"] }
arrow-arith = "53.2"
arrow-array = "53.2"
arrow-buffer = "53.2"
arrow-cast = "53.2"
arrow-data = "53.2"
arrow-ipc = { version = "53.2", features = ["zstd"] }
arrow-ord = "53.2"
arrow-row = "53.2"
arrow-schema = "53.2"
arrow-select = "53.2"
async-recursion = "1.0"
async-trait = "0.1"
aws-config = "1.2.0"
Expand All @@ -95,18 +95,18 @@ criterion = { version = "0.5", features = [
"html_reports",
] }
crossbeam-queue = "0.3"
datafusion = { version = "41.0", default-features = false, features = [
datafusion = { version = "42.0", default-features = false, features = [
"nested_expressions",
"regex_expressions",
"unicode_expressions",
] }
datafusion-common = "41.0"
datafusion-functions = { version = "41.0", features = ["regex_expressions"] }
datafusion-sql = "41.0"
datafusion-expr = "41.0"
datafusion-execution = "41.0"
datafusion-optimizer = "41.0"
datafusion-physical-expr = { version = "41.0", features = [
datafusion-common = "42.0"
datafusion-functions = { version = "42.0", features = ["regex_expressions"] }
datafusion-sql = "42.0"
datafusion-expr = "42.0"
datafusion-execution = "42.0"
datafusion-optimizer = "42.0"
datafusion-physical-expr = { version = "42.0", features = [
"regex_expressions",
] }
deepsize = "0.2.0"
Expand All @@ -129,9 +129,9 @@ pin-project = "1.0"
path_abs = "0.5"
pprof = { version = "0.13", features = ["flamegraph", "criterion"] }
proptest = "1.3.1"
prost = "0.12.2"
prost-build = "0.12.2"
prost-types = "0.12.2"
prost = "0.13.2"
prost-build = "0.13.2"
prost-types = "0.13.2"
rand = { version = "0.8.3", features = ["small_rng"] }
rangemap = { version = "1.0" }
rayon = "1.10"
Expand Down
10 changes: 10 additions & 0 deletions rust/lance-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,16 @@ impl From<prost::EncodeError> for Error {
}
}

impl From<prost::UnknownEnumValue> for Error {
#[track_caller]
fn from(e: prost::UnknownEnumValue) -> Self {
Self::IO {
source: box_error(e),
location: std::panic::Location::caller().to_snafu_location(),
}
}
}

impl From<tokio::task::JoinError> for Error {
#[track_caller]
fn from(e: tokio::task::JoinError) -> Self {
Expand Down
5 changes: 3 additions & 2 deletions rust/lance-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ datafusion.workspace = true
datafusion-common.workspace = true
datafusion-functions.workspace = true
datafusion-physical-expr.workspace = true
datafusion-substrait = { version = "41.0", optional = true }
datafusion-substrait = { version = "42.0", optional = true }
futures.workspace = true
lance-arrow.workspace = true
lance-core = { workspace = true, features = ["datafusion"] }
Expand All @@ -32,7 +32,8 @@ snafu.workspace = true
tokio.workspace = true

[dev-dependencies]
substrait-expr = { version = "0.2.1" }
# TODO: This is too old
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe somebody needs to also update substrait-expr to use prost 0.13 to match the version used by datafusion-substrait

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess that's on me. How do you feel about bumping all the way to datafusion 43? That release should include a change I made which will let us drop substrait-expr entirely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Version 0.2.2 of substrait-expr is now available and will include prost 0.13

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you feel about bumping all the way to datafusion 43

There were a couple more API surfaces that needed modifying so I was focusing on min-change possible, but I don't have a philosophiscal objection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Let's keep it minimal for now. I'll need to do some more work to get rid of substrait-expr anyways.

#substrait-expr = { version = "0.2.1" }
lance-datagen.workspace = true

[features]
Expand Down
2 changes: 2 additions & 0 deletions rust/lance-datafusion/src/substrait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ pub async fn parse_substrait(expr: &[u8], input_schema: Arc<Schema>) -> Result<E
Ok(expr.data)
}

/*
#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -440,3 +441,4 @@ mod tests {
assert_eq!(df_expr, expected);
}
}
*/
2 changes: 1 addition & 1 deletion rust/lance-io/src/encodings/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl<'a, T: ByteArrayType> BinaryDecoder<'a, T> {
.null_bit_buffer(null_buf);
}

let buf = bytes.into();
let buf = Buffer::from_vec(bytes.to_vec());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This triggers a data copy (and could be the source of your alignment issues). What changed in arrow-rs to necessitate this change? It should still be possible to go from Bytes to Buffer with zero-copy somehow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@jleibs jleibs Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the comment above I believe:
Buffer::from_vec(bytes.into()) is the intended pathway, though I also tried bytes.to_byte_slice().into(), but both still result in the same error:

---- index::vector::ivf::tests::test_create_ivf_hnsw_with_empty_partition stdout ----
thread 'index::vector::ivf::tests::test_create_ivf_hnsw_with_empty_partition' panicked at /home/jleibs/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-buffer-53.3.0/src/buffer/scalar.rs:133:42:
Memory pointer is not aligned with the specified scalar type

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's still going to trigger a copy it seems. I'll take a look real quick.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's still going to trigger a copy it seems.

Is it?

My reads of both:

  • impl From<Bytes> for Vec<u8>
  • Buffer::from_vec()

Look like they should both be zero-copy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I saw bytes.to_byte_slice().into() which I think is not zero-copy. You are right that Buffer::from_vec(bytes.into()) should be zero-copy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, no Buffer::from_vec(bytes.into()) will not be zero copy because bytes.into() is not zero-copy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nevermind, it looks like bytes does have a zero-copy conversion to Vec<u8>. My bad.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing your fix was probably ok here. The root cause was probably just the deep_copy_buffer change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha. Those new routines are much more clear. Thanks.

let array_data = data_builder
.add_buffer(offset_data.buffers()[0].clone())
.add_buffer(buf)
Expand Down
4 changes: 2 additions & 2 deletions rust/lance-io/src/encodings/plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ pub fn bytes_to_array(

// alignment or size isn't right -- just make a copy
if (bytes.len() < min_buffer_size) || (bytes.as_ptr().align_offset(*alignment) != 0) {
bytes.into()
Buffer::from_vec(bytes.to_vec())
} else {
// SAFETY: the alignment is correct we can make this conversion
unsafe {
Expand All @@ -218,7 +218,7 @@ pub fn bytes_to_array(
}
} else {
// cases we don't handle, just copy
bytes.into()
Buffer::from_vec(bytes.to_vec())
};

let array_data = ArrayDataBuilder::new(data_type.clone())
Expand Down
1 change: 1 addition & 0 deletions rust/lance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ arrow.workspace = true
datafusion.workspace = true
datafusion-functions.workspace = true
datafusion-physical-expr.workspace = true
datafusion-expr.workspace = true
lapack = { version = "0.19.0", optional = true }
snafu = { workspace = true }
log = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions rust/lance/src/datafusion/logical_plan.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::{any::Any, sync::Arc};
use std::{any::Any, borrow::Cow, sync::Arc};

use arrow_schema::Schema as ArrowSchema;
use async_trait::async_trait;
Expand Down Expand Up @@ -34,7 +34,7 @@ impl TableProvider for Dataset {
None
}

fn get_logical_plan(&self) -> Option<&LogicalPlan> {
fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
None
}

Expand Down
25 changes: 13 additions & 12 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ use datafusion::physical_plan::{
filter::FilterExec,
limit::GlobalLimitExec,
repartition::RepartitionExec,
udaf::create_aggregate_expr,
union::UnionExec,
ExecutionPlan, SendableRecordBatchStream,
};
use datafusion::scalar::ScalarValue;
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
use datafusion_physical_expr::{Partitioning, PhysicalExpr};
use futures::stream::{Stream, StreamExt};
use futures::TryStreamExt;
Expand Down Expand Up @@ -957,17 +957,18 @@ impl Scanner {
let plan = self.create_plan().await?;
// Datafusion interprets COUNT(*) as COUNT(1)
let one = Arc::new(Literal::new(ScalarValue::UInt8(Some(1))));
let count_expr = create_aggregate_expr(
&count_udaf(),
&[one],
&[lit(1)],
&[],
&[],
&plan.schema(),
None,
false,
false,
)?;

let input_phy_exprs: &[Arc<dyn PhysicalExpr>] = &[one];
let schema = plan.schema();

let mut builder = AggregateExprBuilder::new(count_udaf(), input_phy_exprs.to_vec());
//builder = builder.logical_exprs(input_exprs.to_vec());
builder = builder.schema(schema);
// TODO: This alias seem to be required?
builder = builder.alias("count".to_string());

let count_expr = builder.build()?;

let plan_schema = plan.schema();
let count_plan = Arc::new(AggregateExec::try_new(
AggregateMode::Single,
Expand Down
Loading