Skip to content

Commit

Permalink
rebase on main
Browse files Browse the repository at this point in the history
  • Loading branch information
Blajda committed Apr 30, 2023
1 parent d941320 commit 6cfbfce
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 57 deletions.
58 changes: 7 additions & 51 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,8 @@ use url::Url;

use crate::action::Add;
use crate::builder::ensure_table_uri;
<<<<<<< HEAD
=======
use crate::schema;
use crate::storage::ObjectStoreRef;
use crate::table_state::DeltaTableState;
>>>>>>> 56907ec (Rebase delete operation on main)
use crate::{action, open_table, open_table_with_storage_options, SchemaDataType};
use crate::{DeltaResult, Invariant};
use crate::{DeltaTable, DeltaTableError};
Expand Down Expand Up @@ -383,9 +379,14 @@ pub(crate) async fn parquet_scan_from_actions(
.iter()
.filter(|f| !table_partition_cols.contains(f.name()))
.cloned()
.collect(),
.collect::<Vec<arrow::datatypes::FieldRef>>(),
));

let table_partition_cols = table_partition_cols
.iter()
.map(|c| Ok((c.to_owned(), schema.field_with_name(c)?.data_type().clone())))
.collect::<Result<Vec<_>, ArrowError>>()?;

ParquetFormat::new()
.create_physical_plan(
state,
Expand All @@ -396,17 +397,7 @@ pub(crate) async fn parquet_scan_from_actions(
statistics: snapshot.datafusion_table_statistics(),
projection: projection.cloned(),
limit,
table_partition_cols: table_partition_cols
.iter()
.map(|c| {
Ok((
c.to_owned(),
wrap_partition_type_in_dict(
schema.field_with_name(c)?.data_type().clone(),
),
))
})
.collect::<Result<Vec<_>, ArrowError>>()?,
table_partition_cols,
output_ordering: None,
infinite_source: false,
},
Expand Down Expand Up @@ -475,40 +466,6 @@ impl TableProvider for DeltaTable {
self.get_state().files().to_owned()
};

<<<<<<< HEAD
let table_partition_cols = self.get_metadata()?.partition_columns.clone();
let file_schema = Arc::new(ArrowSchema::new(
schema
.fields()
.iter()
.filter(|f| !table_partition_cols.contains(f.name()))
.cloned()
.collect::<Vec<arrow::datatypes::FieldRef>>(),
));

let table_partition_cols = table_partition_cols
.iter()
.map(|c| Ok((c.to_owned(), schema.field_with_name(c)?.data_type().clone())))
.collect::<Result<Vec<_>, ArrowError>>()?;

let parquet_scan = ParquetFormat::new()
.create_physical_plan(
session,
FileScanConfig {
object_store_url: self.storage.object_store_url(),
file_schema,
file_groups: file_groups.into_values().collect(),
statistics: self.datafusion_table_statistics(),
projection: projection.cloned(),
limit,
table_partition_cols,
output_ordering: None,
infinite_source: false,
},
filter_expr.as_ref(),
)
.await?;
=======
let parquet_scan = parquet_scan_from_actions(
&self.state,
self.object_store(),
Expand All @@ -520,7 +477,6 @@ impl TableProvider for DeltaTable {
limit,
)
.await?;
>>>>>>> 56907ec (Rebase delete operation on main)

Ok(Arc::new(DeltaScan {
table_uri: ensure_table_uri(self.table_uri())?.as_str().into(),
Expand Down
26 changes: 20 additions & 6 deletions rust/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ async fn excute_non_empty_expr(
.iter()
.filter(|f| !table_partition_cols.contains(f.name()))
.cloned()
.collect(),
.collect::<Vec<_>>(),
));
let expr = logical_expr_to_physical_expr(expression, &schema);

Expand Down Expand Up @@ -708,6 +708,16 @@ fn new_array_builder(t: DataType, capacity: usize) -> DeltaResult<Box<dyn ArrayB
})
}

fn get_inner_type(t: DataType) -> DeltaResult<DataType> {
match t {
DataType::Dictionary(_, value) => Ok(value.as_ref().to_owned()),
_ => Err(DeltaTableError::Generic(format!(
"Unable to create builder for arrow type {}",
t
))),
}
}

// Create a record batch that contains the partition columns plus the path of the file
fn create_partition_record_batch(
snapshot: &DeltaTableState,
Expand All @@ -723,6 +733,7 @@ fn create_partition_record_batch(
.unwrap()
.data_type()
.to_owned();
let t = get_inner_type(t)?;
let builder = new_array_builder(t, cap)?;
builders.push(builder);
}
Expand All @@ -738,10 +749,14 @@ fn create_partition_record_batch(
.unwrap()
.data_type()
.to_owned();

// Partitions values are wrapped in a dictionary

let value = match value {
Some(v) => v.to_owned(),
None => None,
};
let t = get_inner_type(t)?;
append_option_str(builder, value, t)?;
}
path_builder.append_value(action.path.clone());
Expand All @@ -756,11 +771,10 @@ fn create_partition_record_batch(
let mut fields = Vec::new();
for partition in partition_columns {
let column = arrow_schema.field_with_name(partition).unwrap();
let field = Field::new(
partition,
column.data_type().to_owned(),
column.is_nullable(),
);

let t = column.data_type().to_owned();
let t = get_inner_type(t)?;
let field = Field::new(partition, t, column.is_nullable());
fields.push(field);
}
fields.push(Field::new(PATH_COLUMN, DataType::Utf8, false));
Expand Down

0 comments on commit 6cfbfce

Please sign in to comment.