From 6cfbfced6e1cc11b5dbdc2226cbd3b005da072f7 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sun, 30 Apr 2023 18:19:38 -0400 Subject: [PATCH] rebase on main --- rust/src/delta_datafusion.rs | 58 +++++------------------------------ rust/src/operations/delete.rs | 26 ++++++++++++---- 2 files changed, 27 insertions(+), 57 deletions(-) diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 2ab546f06a..f5c62a8e4c 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -59,12 +59,8 @@ use url::Url; use crate::action::Add; use crate::builder::ensure_table_uri; -<<<<<<< HEAD -======= -use crate::schema; use crate::storage::ObjectStoreRef; use crate::table_state::DeltaTableState; ->>>>>>> 56907ec (Rebase delete operation on main) use crate::{action, open_table, open_table_with_storage_options, SchemaDataType}; use crate::{DeltaResult, Invariant}; use crate::{DeltaTable, DeltaTableError}; @@ -383,9 +379,14 @@ pub(crate) async fn parquet_scan_from_actions( .iter() .filter(|f| !table_partition_cols.contains(f.name())) .cloned() - .collect(), + .collect::>(), )); + let table_partition_cols = table_partition_cols + .iter() + .map(|c| Ok((c.to_owned(), schema.field_with_name(c)?.data_type().clone()))) + .collect::, ArrowError>>()?; + ParquetFormat::new() .create_physical_plan( state, @@ -396,17 +397,7 @@ pub(crate) async fn parquet_scan_from_actions( statistics: snapshot.datafusion_table_statistics(), projection: projection.cloned(), limit, - table_partition_cols: table_partition_cols - .iter() - .map(|c| { - Ok(( - c.to_owned(), - wrap_partition_type_in_dict( - schema.field_with_name(c)?.data_type().clone(), - ), - )) - }) - .collect::, ArrowError>>()?, + table_partition_cols, output_ordering: None, infinite_source: false, }, @@ -475,40 +466,6 @@ impl TableProvider for DeltaTable { self.get_state().files().to_owned() }; -<<<<<<< HEAD - let table_partition_cols = self.get_metadata()?.partition_columns.clone(); - let file_schema = Arc::new(ArrowSchema::new( - schema - .fields() - .iter() - .filter(|f| !table_partition_cols.contains(f.name())) - .cloned() - .collect::>(), - )); - - let table_partition_cols = table_partition_cols - .iter() - .map(|c| Ok((c.to_owned(), schema.field_with_name(c)?.data_type().clone()))) - .collect::, ArrowError>>()?; - - let parquet_scan = ParquetFormat::new() - .create_physical_plan( - session, - FileScanConfig { - object_store_url: self.storage.object_store_url(), - file_schema, - file_groups: file_groups.into_values().collect(), - statistics: self.datafusion_table_statistics(), - projection: projection.cloned(), - limit, - table_partition_cols, - output_ordering: None, - infinite_source: false, - }, - filter_expr.as_ref(), - ) - .await?; -======= let parquet_scan = parquet_scan_from_actions( &self.state, self.object_store(), @@ -520,7 +477,6 @@ impl TableProvider for DeltaTable { limit, ) .await?; ->>>>>>> 56907ec (Rebase delete operation on main) Ok(Arc::new(DeltaScan { table_uri: ensure_table_uri(self.table_uri())?.as_str().into(), diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index f44ef2390d..e1b87c5ec6 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -419,7 +419,7 @@ async fn excute_non_empty_expr( .iter() .filter(|f| !table_partition_cols.contains(f.name())) .cloned() - .collect(), + .collect::>(), )); let expr = logical_expr_to_physical_expr(expression, &schema); @@ -708,6 +708,16 @@ fn new_array_builder(t: DataType, capacity: usize) -> DeltaResult DeltaResult { + match t { + DataType::Dictionary(_, value) => Ok(value.as_ref().to_owned()), + _ => Err(DeltaTableError::Generic(format!( + "Unable to create builder for arrow type {}", + t + ))), + } +} + // Create a record batch that contains the partition columns plus the path of the file fn create_partition_record_batch( snapshot: &DeltaTableState, @@ -723,6 +733,7 @@ fn create_partition_record_batch( .unwrap() .data_type() .to_owned(); + let t = get_inner_type(t)?; let builder = new_array_builder(t, cap)?; builders.push(builder); } @@ -738,10 +749,14 @@ fn create_partition_record_batch( .unwrap() .data_type() .to_owned(); + + // Partitions values are wrapped in a dictionary + let value = match value { Some(v) => v.to_owned(), None => None, }; + let t = get_inner_type(t)?; append_option_str(builder, value, t)?; } path_builder.append_value(action.path.clone()); @@ -756,11 +771,10 @@ fn create_partition_record_batch( let mut fields = Vec::new(); for partition in partition_columns { let column = arrow_schema.field_with_name(partition).unwrap(); - let field = Field::new( - partition, - column.data_type().to_owned(), - column.is_nullable(), - ); + + let t = column.data_type().to_owned(); + let t = get_inner_type(t)?; + let field = Field::new(partition, t, column.is_nullable()); fields.push(field); } fields.push(Field::new(PATH_COLUMN, DataType::Utf8, false));