From 8b4de59517d2e0f49e077e4848e939f96924a311 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sat, 8 Apr 2023 09:41:38 +0200 Subject: [PATCH 1/6] feat: allow column selection in laod command --- rust/src/delta_datafusion.rs | 19 +++++- rust/src/operations/load.rs | 114 +++++++++++++++++++---------------- rust/src/operations/mod.rs | 2 +- 3 files changed, 81 insertions(+), 54 deletions(-) diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 029a4974a1..82e3b22c83 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -28,7 +28,9 @@ use std::sync::Arc; use arrow::array::ArrayRef; use arrow::compute::{cast_with_options, CastOptions}; -use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, TimeUnit}; +use arrow::datatypes::{ + DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef, TimeUnit, +}; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use async_trait::async_trait; @@ -380,6 +382,21 @@ impl TableProvider for DeltaTable { DeltaTable::schema(self).unwrap(), )?); + let schema = Arc::new(ArrowSchema::new( + schema + .fields() + .iter() + .map(|field| match field.data_type() { + ArrowDataType::Timestamp(TimeUnit::Microsecond, tz) => ArrowField::new( + field.name().clone(), + ArrowDataType::Timestamp(TimeUnit::Nanosecond, tz.clone()), + field.is_nullable(), + ), + _ => field.clone(), + }) + .collect(), + )); + register_store(self, session.runtime_env().clone()); // TODO we group files together by their partition values. If the table is partitioned diff --git a/rust/src/operations/load.rs b/rust/src/operations/load.rs index ea4c5e49dd..2be717e549 100644 --- a/rust/src/operations/load.rs +++ b/rust/src/operations/load.rs @@ -1,71 +1,40 @@ -use std::collections::HashMap; use std::sync::Arc; -use crate::storage::DeltaObjectStore; -use crate::{builder::ensure_table_uri, DeltaResult, DeltaTable}; - use datafusion::datasource::TableProvider; use datafusion::execution::context::{SessionContext, TaskContext}; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use futures::future::BoxFuture; +use crate::storage::DeltaObjectStore; +use crate::table_state::DeltaTableState; +use crate::{DeltaResult, DeltaTable, DeltaTableError}; + #[derive(Debug, Clone)] pub struct LoadBuilder { - location: Option, + /// A snapshot of the to-be-loaded table's state + snapshot: DeltaTableState, + /// Delta object store for handling data files + store: Arc, + /// A sub-selection of columns to be loaded columns: Option>, - storage_options: Option>, - object_store: Option>, -} - -impl Default for LoadBuilder { - fn default() -> Self { - Self::new() - } } impl LoadBuilder { /// Create a new [`LoadBuilder`] - pub fn new() -> Self { + pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { Self { - location: None, + snapshot, + store, columns: None, - storage_options: None, - object_store: None, } } - /// Specify the path to the location where table data is stored, - /// which could be a path on distributed storage. - pub fn with_location(mut self, location: impl Into) -> Self { - self.location = Some(location.into()); - self - } - /// Specify column selection to load pub fn with_columns(mut self, columns: impl IntoIterator>) -> Self { self.columns = Some(columns.into_iter().map(|s| s.into()).collect()); self } - - /// Set options used to initialize storage backend - /// - /// Options may be passed in the HashMap or set as environment variables. - /// - /// [crate::builder::s3_storage_options] describes the available options for the AWS or S3-compliant backend. - /// [dynamodb_lock::DynamoDbLockClient] describes additional options for the AWS atomic rename client. - /// [crate::builder::azure_storage_options] describes the available options for the Azure backend. - /// [crate::builder::gcp_storage_options] describes the available options for the Google Cloud Platform backend. - pub fn with_storage_options(mut self, storage_options: HashMap) -> Self { - self.storage_options = Some(storage_options); - self - } - - /// Provide a [`DeltaObjectStore`] instance, that points at table location - pub fn with_object_store(mut self, object_store: Arc) -> Self { - self.object_store = Some(object_store); - self - } } impl std::future::IntoFuture for LoadBuilder { @@ -76,20 +45,31 @@ impl std::future::IntoFuture for LoadBuilder { let this = self; Box::pin(async move { - let object_store = this.object_store.unwrap(); - let url = ensure_table_uri(object_store.root_uri())?; - let store = object_store.storage_backend().clone(); - let mut table = DeltaTable::new(object_store, Default::default()); - table.load().await?; + let table = DeltaTable::new_with_state(this.store, this.snapshot); + let schema = table.state.arrow_schema()?; + let projection = this + .columns + .map(|cols| { + cols.iter() + .map(|col| { + schema.column_with_name(col).map(|(idx, _)| idx).ok_or( + DeltaTableError::SchemaMismatch { + msg: format!("Column '{col}' does not exist in table schema."), + }, + ) + }) + .collect::>() + }) + .transpose()?; let ctx = SessionContext::new(); - ctx.state() - .runtime_env() - .register_object_store(url.scheme(), "", store); - let scan_plan = table.scan(&ctx.state(), None, &[], None).await?; + let scan_plan = table + .scan(&ctx.state(), projection.as_ref(), &[], None) + .await?; let plan = CoalescePartitionsExec::new(scan_plan); let task_ctx = Arc::new(TaskContext::from(&ctx.state())); let stream = plan.execute(0, task_ctx)?; + Ok((table, stream)) }) } @@ -157,4 +137,34 @@ mod tests { assert_eq!(batch.schema(), data[0].schema()); Ok(()) } + + #[tokio::test] + async fn test_load_with_columns() -> TestResult { + let batch = get_record_batch(None, false); + let table = DeltaOps::new_in_memory().write(vec![batch.clone()]).await?; + + let (_table, stream) = DeltaOps(table).load().with_columns(["id", "value"]).await?; + let data = collect_sendable_stream(stream).await?; + + let expected = vec![ + "+----+-------+", + "| id | value |", + "+----+-------+", + "| A | 1 |", + "| B | 2 |", + "| A | 3 |", + "| B | 4 |", + "| A | 5 |", + "| A | 6 |", + "| A | 7 |", + "| B | 8 |", + "| B | 9 |", + "| A | 10 |", + "| A | 11 |", + "+----+-------+", + ]; + + assert_batches_sorted_eq!(&expected, &data); + Ok(()) + } } diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index 7a554809a5..1f17d4b967 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -102,7 +102,7 @@ impl DeltaOps { #[cfg(feature = "datafusion")] #[must_use] pub fn load(self) -> LoadBuilder { - LoadBuilder::default().with_object_store(self.0.object_store()) + LoadBuilder::new(self.0.object_store(), self.0.state) } /// Write data to Delta table From eecd2cc6e977a3a52265ac025febcf1ff78befa5 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sat, 8 Apr 2023 11:10:48 +0200 Subject: [PATCH 2/6] feat: read physical file schema for scan operations --- rust/src/delta_datafusion.rs | 46 +++++++++++------------- rust/src/operations/transaction/state.rs | 22 ++++++++++++ rust/tests/datafusion_test.rs | 11 +++++- 3 files changed, 53 insertions(+), 26 deletions(-) diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 82e3b22c83..38b1373d35 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -28,9 +28,7 @@ use std::sync::Arc; use arrow::array::ArrayRef; use arrow::compute::{cast_with_options, CastOptions}; -use arrow::datatypes::{ - DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef, TimeUnit, -}; +use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, TimeUnit}; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use async_trait::async_trait; @@ -382,17 +380,21 @@ impl TableProvider for DeltaTable { DeltaTable::schema(self).unwrap(), )?); - let schema = Arc::new(ArrowSchema::new( + let file_schema = self + .state + .physical_arrow_schema(self.object_store()) + .await?; + + let table_schema = Arc::new(ArrowSchema::new( schema - .fields() - .iter() - .map(|field| match field.data_type() { - ArrowDataType::Timestamp(TimeUnit::Microsecond, tz) => ArrowField::new( - field.name().clone(), - ArrowDataType::Timestamp(TimeUnit::Nanosecond, tz.clone()), - field.is_nullable(), - ), - _ => field.clone(), + .fields + .clone() + .into_iter() + .map(|field| { + file_schema + .field_with_name(field.name()) + .cloned() + .unwrap_or(field) }) .collect(), )); @@ -406,7 +408,7 @@ impl TableProvider for DeltaTable { if let Some(Some(predicate)) = (!filters.is_empty()).then_some(conjunction(filters.iter().cloned())) { - let pruning_predicate = PruningPredicate::try_new(predicate, schema.clone())?; + let pruning_predicate = PruningPredicate::try_new(predicate, table_schema.clone())?; let files_to_prune = pruning_predicate.prune(&self.state)?; self.get_state() .files() @@ -414,7 +416,7 @@ impl TableProvider for DeltaTable { .zip(files_to_prune.into_iter()) .for_each(|(action, keep_file)| { if keep_file { - let part = partitioned_file_from_action(action, &schema); + let part = partitioned_file_from_action(action, &table_schema); file_groups .entry(part.partition_values.clone()) .or_default() @@ -423,7 +425,7 @@ impl TableProvider for DeltaTable { }); } else { self.get_state().files().iter().for_each(|action| { - let part = partitioned_file_from_action(action, &schema); + let part = partitioned_file_from_action(action, &table_schema); file_groups .entry(part.partition_values.clone()) .or_default() @@ -432,14 +434,6 @@ impl TableProvider for DeltaTable { }; let table_partition_cols = self.get_metadata()?.partition_columns.clone(); - let file_schema = Arc::new(ArrowSchema::new( - schema - .fields() - .iter() - .filter(|f| !table_partition_cols.contains(f.name())) - .cloned() - .collect(), - )); let parquet_scan = ParquetFormat::new() .create_physical_plan( @@ -456,7 +450,9 @@ impl TableProvider for DeltaTable { .map(|c| { Ok(( c.to_owned(), - partition_type_wrap(schema.field_with_name(c)?.data_type().clone()), + partition_type_wrap( + table_schema.field_with_name(c)?.data_type().clone(), + ), )) }) .collect::, ArrowError>>()?, diff --git a/rust/src/operations/transaction/state.rs b/rust/src/operations/transaction/state.rs index f521ac2062..694607882d 100644 --- a/rust/src/operations/transaction/state.rs +++ b/rust/src/operations/transaction/state.rs @@ -11,6 +11,8 @@ use datafusion_common::{Column, DFSchema, Result as DFResult, TableReference}; use datafusion_expr::{AggregateUDF, Expr, ScalarUDF, TableSource}; use datafusion_sql::planner::{ContextProvider, SqlToRel}; use itertools::Either; +use object_store::ObjectStore; +use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; use sqlparser::dialect::GenericDialect; use sqlparser::parser::Parser; use sqlparser::tokenizer::Tokenizer; @@ -82,6 +84,26 @@ impl DeltaTableState { Ok(sql_to_rel.sql_to_expr(sql, &df_schema, &mut Default::default())?) } + + /// Get the pysical schema for data files stored in the table. + pub async fn physical_arrow_schema( + &self, + object_store: Arc, + ) -> DeltaResult { + let file_meta = self + .files() + .iter() + .max_by_key(|obj| obj.modification_time) + .ok_or(DeltaTableError::Generic("No active file actions to get physical schema. Maybe the current state has not yet been loaded?".into()))? + .try_into()?; + + let file_reader = ParquetObjectReader::new(object_store, file_meta); + let batch_stream = ParquetRecordBatchStreamBuilder::new(file_reader) + .await? + .build()?; + + Ok(batch_stream.schema().clone()) + } } pub struct AddContainer<'a> { diff --git a/rust/tests/datafusion_test.rs b/rust/tests/datafusion_test.rs index 466ccfc733..7abccc2749 100644 --- a/rust/tests/datafusion_test.rs +++ b/rust/tests/datafusion_test.rs @@ -728,7 +728,16 @@ async fn test_datafusion_partitioned_types() -> Result<()> { ), ]); - assert_eq!(Arc::new(expected_schema), batches[0].schema()); + assert_eq!( + Arc::new(expected_schema), + Arc::new( + batches[0] + .schema() + .as_ref() + .clone() + .with_metadata(Default::default()) + ) + ); Ok(()) } From 8328ffefdcd022165991e7234fffdf3bc6ac68a4 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sat, 8 Apr 2023 11:46:00 +0200 Subject: [PATCH 3/6] refactor: return full table schema when creating physical schema --- rust/src/delta_datafusion.rs | 38 ++++++++---------------- rust/src/operations/transaction/state.rs | 30 +++++++++++++++---- 2 files changed, 38 insertions(+), 30 deletions(-) diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 38b1373d35..9feb617470 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -376,29 +376,11 @@ impl TableProvider for DeltaTable { filters: &[Expr], limit: Option, ) -> DataFusionResult> { - let schema = Arc::new(>::try_from( - DeltaTable::schema(self).unwrap(), - )?); - - let file_schema = self + let schema = self .state .physical_arrow_schema(self.object_store()) .await?; - let table_schema = Arc::new(ArrowSchema::new( - schema - .fields - .clone() - .into_iter() - .map(|field| { - file_schema - .field_with_name(field.name()) - .cloned() - .unwrap_or(field) - }) - .collect(), - )); - register_store(self, session.runtime_env().clone()); // TODO we group files together by their partition values. If the table is partitioned @@ -408,7 +390,7 @@ impl TableProvider for DeltaTable { if let Some(Some(predicate)) = (!filters.is_empty()).then_some(conjunction(filters.iter().cloned())) { - let pruning_predicate = PruningPredicate::try_new(predicate, table_schema.clone())?; + let pruning_predicate = PruningPredicate::try_new(predicate, schema.clone())?; let files_to_prune = pruning_predicate.prune(&self.state)?; self.get_state() .files() @@ -416,7 +398,7 @@ impl TableProvider for DeltaTable { .zip(files_to_prune.into_iter()) .for_each(|(action, keep_file)| { if keep_file { - let part = partitioned_file_from_action(action, &table_schema); + let part = partitioned_file_from_action(action, &schema); file_groups .entry(part.partition_values.clone()) .or_default() @@ -425,7 +407,7 @@ impl TableProvider for DeltaTable { }); } else { self.get_state().files().iter().for_each(|action| { - let part = partitioned_file_from_action(action, &table_schema); + let part = partitioned_file_from_action(action, &schema); file_groups .entry(part.partition_values.clone()) .or_default() @@ -434,6 +416,14 @@ impl TableProvider for DeltaTable { }; let table_partition_cols = self.get_metadata()?.partition_columns.clone(); + let file_schema = Arc::new(ArrowSchema::new( + schema + .fields() + .iter() + .filter(|f| !table_partition_cols.contains(f.name())) + .cloned() + .collect(), + )); let parquet_scan = ParquetFormat::new() .create_physical_plan( @@ -450,9 +440,7 @@ impl TableProvider for DeltaTable { .map(|c| { Ok(( c.to_owned(), - partition_type_wrap( - table_schema.field_with_name(c)?.data_type().clone(), - ), + partition_type_wrap(schema.field_with_name(c)?.data_type().clone()), )) }) .collect::, ArrowError>>()?, diff --git a/rust/src/operations/transaction/state.rs b/rust/src/operations/transaction/state.rs index 694607882d..31b4675877 100644 --- a/rust/src/operations/transaction/state.rs +++ b/rust/src/operations/transaction/state.rs @@ -85,7 +85,10 @@ impl DeltaTableState { Ok(sql_to_rel.sql_to_expr(sql, &df_schema, &mut Default::default())?) } - /// Get the pysical schema for data files stored in the table. + /// Get the pysical table schema. + /// + /// This will construct a schema derived from the parqet schema of the latest data file, + /// and fields for partition columns from the schema defined in table meta data. pub async fn physical_arrow_schema( &self, object_store: Arc, @@ -98,11 +101,28 @@ impl DeltaTableState { .try_into()?; let file_reader = ParquetObjectReader::new(object_store, file_meta); - let batch_stream = ParquetRecordBatchStreamBuilder::new(file_reader) + let file_schema = ParquetRecordBatchStreamBuilder::new(file_reader) .await? - .build()?; - - Ok(batch_stream.schema().clone()) + .build()? + .schema() + .clone(); + + let schema = self.arrow_schema()?; + let table_schema = Arc::new(ArrowSchema::new( + schema + .fields + .clone() + .into_iter() + .map(|field| { + file_schema + .field_with_name(field.name()) + .cloned() + .unwrap_or(field) + }) + .collect(), + )); + + Ok(table_schema) } } From 54d4c4f086cdf2b461fc8f4aa9988aa8f46165fa Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sat, 8 Apr 2023 12:03:44 +0200 Subject: [PATCH 4/6] test: add test for scanning table with nano timestamps in data files --- rust/src/operations/transaction/state.rs | 3 +-- rust/tests/datafusion_test.rs | 24 ++++++++++++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/rust/src/operations/transaction/state.rs b/rust/src/operations/transaction/state.rs index 31b4675877..4be094b9f8 100644 --- a/rust/src/operations/transaction/state.rs +++ b/rust/src/operations/transaction/state.rs @@ -107,9 +107,8 @@ impl DeltaTableState { .schema() .clone(); - let schema = self.arrow_schema()?; let table_schema = Arc::new(ArrowSchema::new( - schema + self.arrow_schema()? .fields .clone() .into_iter() diff --git a/rust/tests/datafusion_test.rs b/rust/tests/datafusion_test.rs index 7abccc2749..a6d18aad7e 100644 --- a/rust/tests/datafusion_test.rs +++ b/rust/tests/datafusion_test.rs @@ -741,3 +741,27 @@ async fn test_datafusion_partitioned_types() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_datafusion_scan_timestamps() -> Result<()> { + let ctx = SessionContext::new(); + let table = deltalake::open_table("./tests/data/table_with_edge_timestamps") + .await + .unwrap(); + ctx.register_table("demo", Arc::new(table))?; + + let batches = ctx.sql("SELECT * FROM demo").await?.collect().await?; + + let expected = vec![ + "+-------------------------------+---------------------+------------+", + "| BIG_DATE | NORMAL_DATE | SOME_VALUE |", + "+-------------------------------+---------------------+------------+", + "| 1816-03-28T05:56:08.066277376 | 2022-02-01T00:00:00 | 2 |", + "| 1816-03-29T05:56:08.066277376 | 2022-01-01T00:00:00 | 1 |", + "+-------------------------------+---------------------+------------+", + ]; + + assert_batches_sorted_eq!(&expected, &batches); + + Ok(()) +} From 5f15fb6ef23fa6872d5d33a0e913fc624ab086c8 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Thu, 13 Apr 2023 01:11:48 +0200 Subject: [PATCH 5/6] fix: fall back to table schema when no files are present --- rust/src/operations/transaction/state.rs | 56 ++++++++++++------------ 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/rust/src/operations/transaction/state.rs b/rust/src/operations/transaction/state.rs index 4be094b9f8..b50c5b960b 100644 --- a/rust/src/operations/transaction/state.rs +++ b/rust/src/operations/transaction/state.rs @@ -93,35 +93,33 @@ impl DeltaTableState { &self, object_store: Arc, ) -> DeltaResult { - let file_meta = self - .files() - .iter() - .max_by_key(|obj| obj.modification_time) - .ok_or(DeltaTableError::Generic("No active file actions to get physical schema. Maybe the current state has not yet been loaded?".into()))? - .try_into()?; - - let file_reader = ParquetObjectReader::new(object_store, file_meta); - let file_schema = ParquetRecordBatchStreamBuilder::new(file_reader) - .await? - .build()? - .schema() - .clone(); - - let table_schema = Arc::new(ArrowSchema::new( - self.arrow_schema()? - .fields - .clone() - .into_iter() - .map(|field| { - file_schema - .field_with_name(field.name()) - .cloned() - .unwrap_or(field) - }) - .collect(), - )); - - Ok(table_schema) + if let Some(add) = self.files().iter().max_by_key(|obj| obj.modification_time) { + let file_meta = add.try_into()?; + let file_reader = ParquetObjectReader::new(object_store, file_meta); + let file_schema = ParquetRecordBatchStreamBuilder::new(file_reader) + .await? + .build()? + .schema() + .clone(); + + let table_schema = Arc::new(ArrowSchema::new( + self.arrow_schema()? + .fields + .clone() + .into_iter() + .map(|field| { + file_schema + .field_with_name(field.name()) + .cloned() + .unwrap_or(field) + }) + .collect(), + )); + + Ok(table_schema) + } else { + self.arrow_schema() + } } } From e47f8aac75e8f60179332124a788fd9648aa7040 Mon Sep 17 00:00:00 2001 From: Robert Pack <42610831+roeap@users.noreply.github.com> Date: Thu, 13 Apr 2023 01:16:51 +0200 Subject: [PATCH 6/6] Update rust/src/operations/transaction/state.rs Co-authored-by: Will Jones --- rust/src/operations/transaction/state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/src/operations/transaction/state.rs b/rust/src/operations/transaction/state.rs index b50c5b960b..b22c8f8e8e 100644 --- a/rust/src/operations/transaction/state.rs +++ b/rust/src/operations/transaction/state.rs @@ -87,7 +87,7 @@ impl DeltaTableState { /// Get the pysical table schema. /// - /// This will construct a schema derived from the parqet schema of the latest data file, + /// This will construct a schema derived from the parquet schema of the latest data file, /// and fields for partition columns from the schema defined in table meta data. pub async fn physical_arrow_schema( &self,