Skip to content

Commit

Permalink
update datafusion to 31
Browse files Browse the repository at this point in the history
  • Loading branch information
houqp committed Sep 17, 2023
1 parent f6cbdc9 commit 022dc1b
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 31 deletions.
30 changes: 15 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@ debug = "line-tables-only"

[workspace.dependencies]
# arrow
arrow = { version = "45" }
arrow-array = { version = "45" }
arrow-buffer = { version = "45" }
arrow-cast = { version = "45" }
arrow-ord = { version = "45" }
arrow-row = { version = "45" }
arrow-schema = { version = "45" }
arrow-select = { version = "45" }
parquet = { version = "45" }
arrow = { version = "46" }
arrow-array = { version = "46" }
arrow-buffer = { version = "46" }
arrow-cast = { version = "46" }
arrow-ord = { version = "46" }
arrow-row = { version = "46" }
arrow-schema = { version = "46" }
arrow-select = { version = "46" }
parquet = { version = "46" }

# datafusion
datafusion = { version = "30" }
datafusion-expr = { version = "30" }
datafusion-common = { version = "30" }
datafusion-proto = { version = "30" }
datafusion-sql = { version = "30" }
datafusion-physical-expr = { version = "30" }
datafusion = { version = "31" }
datafusion-expr = { version = "31" }
datafusion-common = { version = "31" }
datafusion-proto = { version = "31" }
datafusion-sql = { version = "31" }
datafusion-physical-expr = { version = "31" }

# serde
serde = { version = "1", features = ["derive"] }
Expand Down
4 changes: 2 additions & 2 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ log = "0"
libc = ">=0.2.90, <1"
num-bigint = "0.4"
num-traits = "0.2.15"
object_store = "0.6.1"
object_store = "0.7"
once_cell = "1.16.0"
parking_lot = "0.12"
parquet2 = { version = "0.17", optional = true }
Expand All @@ -99,7 +99,7 @@ reqwest-retry = { version = "0.2.2", optional = true }
# Datafusion
dashmap = { version = "5", optional = true }

sqlparser = { version = "0.36", optional = true }
sqlparser = { version = "0.37", optional = true }

# NOTE dependencies only for integration tests
fs_extra = { version = "1.2.0", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion rust/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ impl AsRef<DeltaTable> for DeltaOps {
mod datafusion_utils {
use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result as DataFusionResult;
use datafusion::physical_plan::DisplayAs;
use datafusion::physical_plan::{
Expand Down
4 changes: 2 additions & 2 deletions rust/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl DeltaObjectStore {
///
/// # Arguments
///
/// * `storage` - A shared reference to an [`ObjectStore`](object_store::ObjectStore) with "/" pointing at delta table root (i.e. where `_delta_log` is located).
/// * `storage` - A shared reference to an [`object_store::ObjectStore`] with "/" pointing at delta table root (i.e. where `_delta_log` is located).
/// * `location` - A url corresponding to the storage location of `storage`.
pub fn new(storage: Arc<DynObjectStore>, location: Url) -> Self {
Self {
Expand Down Expand Up @@ -213,7 +213,7 @@ impl ObjectStore for DeltaObjectStore {

/// Perform a get request with options
///
/// Note: options.range will be ignored if [`GetResult::File`]
/// Note: options.range will be ignored if [`object_store::GetResultPayload::File`]
async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
self.storage.get_opts(location, options).await
}
Expand Down
17 changes: 6 additions & 11 deletions rust/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
use std::{collections::HashMap, sync::Arc};

use arrow::array::{Array, UInt32Array};
use arrow::compute::{lexicographical_partition_ranges, take, SortColumn};
use arrow::compute::{partition, take};
use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -364,24 +364,19 @@ pub(crate) fn divide_by_partition_values(
let indices = lexsort_to_indices(sort_columns.columns());
let sorted_partition_columns = partition_columns
.iter()
.map(|c| {
Ok(SortColumn {
values: take(values.column(schema.index_of(c)?), &indices, None)?,
options: None,
})
})
.map(|c| Ok(take(values.column(schema.index_of(c)?), &indices, None)?))
.collect::<Result<Vec<_>, DeltaWriterError>>()?;

let partition_ranges = lexicographical_partition_ranges(sorted_partition_columns.as_slice())?;
let partition_ranges = partition(sorted_partition_columns.as_slice())?;

for range in partition_ranges {
for range in partition_ranges.ranges().into_iter() {
// get row indices for current partition
let idx: UInt32Array = (range.start..range.end)
.map(|i| Some(indices.value(i)))
.collect();

let partition_key_iter = sorted_partition_columns.iter().map(|c| {
stringified_partition_value(&c.values.slice(range.start, range.end - range.start))
let partition_key_iter = sorted_partition_columns.iter().map(|col| {
stringified_partition_value(&col.slice(range.start, range.end - range.start))
});

let mut partition_values = HashMap::new();
Expand Down

0 comments on commit 022dc1b

Please sign in to comment.