From 022dc1beed70263629b0007534034e7a5bee63fe Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sat, 16 Sep 2023 18:56:45 -0700 Subject: [PATCH] update datafusion to 31 --- Cargo.toml | 30 +++++++++++++++--------------- rust/Cargo.toml | 4 ++-- rust/src/operations/mod.rs | 2 +- rust/src/storage/mod.rs | 4 ++-- rust/src/writer/record_batch.rs | 17 ++++++----------- 5 files changed, 26 insertions(+), 31 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c634212568..85de7c3033 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,23 +15,23 @@ debug = "line-tables-only" [workspace.dependencies] # arrow -arrow = { version = "45" } -arrow-array = { version = "45" } -arrow-buffer = { version = "45" } -arrow-cast = { version = "45" } -arrow-ord = { version = "45" } -arrow-row = { version = "45" } -arrow-schema = { version = "45" } -arrow-select = { version = "45" } -parquet = { version = "45" } +arrow = { version = "46" } +arrow-array = { version = "46" } +arrow-buffer = { version = "46" } +arrow-cast = { version = "46" } +arrow-ord = { version = "46" } +arrow-row = { version = "46" } +arrow-schema = { version = "46" } +arrow-select = { version = "46" } +parquet = { version = "46" } # datafusion -datafusion = { version = "30" } -datafusion-expr = { version = "30" } -datafusion-common = { version = "30" } -datafusion-proto = { version = "30" } -datafusion-sql = { version = "30" } -datafusion-physical-expr = { version = "30" } +datafusion = { version = "31" } +datafusion-expr = { version = "31" } +datafusion-common = { version = "31" } +datafusion-proto = { version = "31" } +datafusion-sql = { version = "31" } +datafusion-physical-expr = { version = "31" } # serde serde = { version = "1", features = ["derive"] } diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 4c8ebea213..1f523ff13f 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -73,7 +73,7 @@ log = "0" libc = ">=0.2.90, <1" num-bigint = "0.4" num-traits = "0.2.15" -object_store = "0.6.1" +object_store = "0.7" once_cell = "1.16.0" parking_lot = "0.12" parquet2 = { version = "0.17", optional = true } @@ -99,7 +99,7 @@ reqwest-retry = { version = "0.2.2", optional = true } # Datafusion dashmap = { version = "5", optional = true } -sqlparser = { version = "0.36", optional = true } +sqlparser = { version = "0.37", optional = true } # NOTE dependencies only for integration tests fs_extra = { version = "1.2.0", optional = true } diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index 0260952700..28dea06777 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -202,8 +202,8 @@ impl AsRef for DeltaOps { mod datafusion_utils { use std::sync::Arc; - use arrow_array::RecordBatch; use arrow_schema::SchemaRef; + use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::Result as DataFusionResult; use datafusion::physical_plan::DisplayAs; use datafusion::physical_plan::{ diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index 6d4bd080e0..9798a10b1a 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -74,7 +74,7 @@ impl DeltaObjectStore { /// /// # Arguments /// - /// * `storage` - A shared reference to an [`ObjectStore`](object_store::ObjectStore) with "/" pointing at delta table root (i.e. where `_delta_log` is located). + /// * `storage` - A shared reference to an [`object_store::ObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located). /// * `location` - A url corresponding to the storage location of `storage`. pub fn new(storage: Arc, location: Url) -> Self { Self { @@ -213,7 +213,7 @@ impl ObjectStore for DeltaObjectStore { /// Perform a get request with options /// - /// Note: options.range will be ignored if [`GetResult::File`] + /// Note: options.range will be ignored if [`object_store::GetResultPayload::File`] async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { self.storage.get_opts(location, options).await } diff --git a/rust/src/writer/record_batch.rs b/rust/src/writer/record_batch.rs index 2b62fe686c..9570d4350e 100644 --- a/rust/src/writer/record_batch.rs +++ b/rust/src/writer/record_batch.rs @@ -29,7 +29,7 @@ use std::{collections::HashMap, sync::Arc}; use arrow::array::{Array, UInt32Array}; -use arrow::compute::{lexicographical_partition_ranges, take, SortColumn}; +use arrow::compute::{partition, take}; use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; @@ -364,24 +364,19 @@ pub(crate) fn divide_by_partition_values( let indices = lexsort_to_indices(sort_columns.columns()); let sorted_partition_columns = partition_columns .iter() - .map(|c| { - Ok(SortColumn { - values: take(values.column(schema.index_of(c)?), &indices, None)?, - options: None, - }) - }) + .map(|c| Ok(take(values.column(schema.index_of(c)?), &indices, None)?)) .collect::, DeltaWriterError>>()?; - let partition_ranges = lexicographical_partition_ranges(sorted_partition_columns.as_slice())?; + let partition_ranges = partition(sorted_partition_columns.as_slice())?; - for range in partition_ranges { + for range in partition_ranges.ranges().into_iter() { // get row indices for current partition let idx: UInt32Array = (range.start..range.end) .map(|i| Some(indices.value(i))) .collect(); - let partition_key_iter = sorted_partition_columns.iter().map(|c| { - stringified_partition_value(&c.values.slice(range.start, range.end - range.start)) + let partition_key_iter = sorted_partition_columns.iter().map(|col| { + stringified_partition_value(&col.slice(range.start, range.end - range.start)) }); let mut partition_values = HashMap::new();