From 65179b6fd0180b62cea8ecdd4ce16918fb6138d3 Mon Sep 17 00:00:00 2001 From: Robert Pack <42610831+roeap@users.noreply.github.com> Date: Tue, 26 Sep 2023 03:17:32 +0200 Subject: [PATCH] refactor: re-organize top level modules (#1434) # Description ~This contains changes from #1432, will rebase once that's merged.~ This PR constitutes the bulk of re-organising our top level modules. - move `DeltaTable*` structs into new `table` module - move table configuration into `table` module - move schema related modules into `schema` module - rename `action` module to `protocol` - hoping to isolate everything that can one day be the log kernel. ~It also removes the deprecated commit logic from `DeltaTable` and updates call sites and tests accordingly.~ I am planning one more follow up, where I hope to make `transactions` currently within `operations` a top level module. While the number of touched files here is already massive, I want to do this in a follow up, as it will also include some updates to the transactions itself, that should be more carefully reviewed. # Related Issue(s) closes: #1136 # Documentation --- python/src/error.rs | 2 +- python/src/lib.rs | 18 ++-- rust/benches/read_checkpoint.rs | 4 +- rust/examples/basic_operations.rs | 2 +- rust/src/data_catalog/storage/mod.rs | 3 +- rust/src/delta_datafusion.rs | 12 +-- rust/src/errors.rs | 2 +- rust/src/lib.rs | 94 ++++++++++++++----- rust/src/operations/create.rs | 13 +-- rust/src/operations/delete.rs | 11 ++- rust/src/operations/filesystem_check.rs | 4 +- rust/src/operations/load.rs | 2 +- rust/src/operations/merge.rs | 33 +++---- rust/src/operations/mod.rs | 4 +- rust/src/operations/optimize.rs | 6 +- rust/src/operations/restore.rs | 12 +-- .../transaction/conflict_checker.rs | 37 +++++--- rust/src/operations/transaction/mod.rs | 4 +- rust/src/operations/transaction/state.rs | 4 +- rust/src/operations/transaction/test_utils.rs | 12 +-- rust/src/operations/update.rs | 6 +- rust/src/operations/vacuum.rs | 2 +- rust/src/operations/write.rs | 8 +- rust/src/operations/writer.rs | 23 ++--- rust/src/{action => protocol}/checkpoints.rs | 11 ++- rust/src/{action => protocol}/mod.rs | 6 +- .../parquet2_read/boolean.rs | 2 +- .../parquet2_read/dictionary/binary.rs | 0 .../parquet2_read/dictionary/mod.rs | 0 .../parquet2_read/dictionary/primitive.rs | 0 .../{action => protocol}/parquet2_read/map.rs | 2 +- .../{action => protocol}/parquet2_read/mod.rs | 6 +- .../parquet2_read/primitive.rs | 2 +- .../parquet2_read/stats.rs | 2 +- .../parquet2_read/string.rs | 2 +- .../parquet2_read/validity.rs | 0 .../{action => protocol}/parquet_read/mod.rs | 2 +- rust/src/{action => protocol}/serde_path.rs | 0 rust/src/{ => protocol}/time_utils.rs | 0 .../arrow_convert.rs} | 0 rust/src/{schema.rs => schema/mod.rs} | 4 + rust/src/{ => schema}/partitions.rs | 54 +++++------ rust/src/storage/config.rs | 2 +- rust/src/storage/mod.rs | 2 +- rust/src/storage/s3.rs | 2 +- rust/src/storage/utils.rs | 6 +- rust/src/{ => table}/builder.rs | 2 +- rust/src/{delta_config.rs => table/config.rs} | 2 +- rust/src/{delta.rs => table/mod.rs} | 89 ++++-------------- rust/src/{table_state.rs => table/state.rs} | 53 ++++++----- .../state_arrow.rs} | 6 +- rust/src/test_utils.rs | 2 +- rust/src/writer/json.rs | 22 +++-- rust/src/writer/mod.rs | 2 +- rust/src/writer/record_batch.rs | 31 +----- rust/src/writer/stats.rs | 6 +- rust/src/writer/test_utils.rs | 8 +- rust/tests/checkpoint_writer.rs | 10 +- rust/tests/command_filesystem_check.rs | 14 +-- rust/tests/command_optimize.rs | 2 +- rust/tests/command_restore.rs | 2 +- rust/tests/commit_info_format.rs | 2 +- rust/tests/common/mod.rs | 6 +- rust/tests/fs_common/mod.rs | 2 +- rust/tests/integration_concurrent_writes.rs | 6 +- rust/tests/integration_datafusion.rs | 14 +-- rust/tests/integration_object_store.rs | 4 +- rust/tests/integration_read.rs | 8 +- rust/tests/read_delta_log_test.rs | 2 +- rust/tests/read_delta_partitions_test.rs | 2 +- rust/tests/repair_s3_rename_test.rs | 8 +- 71 files changed, 359 insertions(+), 369 deletions(-) rename rust/src/{action => protocol}/checkpoints.rs (98%) rename rust/src/{action => protocol}/mod.rs (99%) rename rust/src/{action => protocol}/parquet2_read/boolean.rs (98%) rename rust/src/{action => protocol}/parquet2_read/dictionary/binary.rs (100%) rename rust/src/{action => protocol}/parquet2_read/dictionary/mod.rs (100%) rename rust/src/{action => protocol}/parquet2_read/dictionary/primitive.rs (100%) rename rust/src/{action => protocol}/parquet2_read/map.rs (99%) rename rust/src/{action => protocol}/parquet2_read/mod.rs (99%) rename rust/src/{action => protocol}/parquet2_read/primitive.rs (99%) rename rust/src/{action => protocol}/parquet2_read/stats.rs (85%) rename rust/src/{action => protocol}/parquet2_read/string.rs (99%) rename rust/src/{action => protocol}/parquet2_read/validity.rs (100%) rename rust/src/{action => protocol}/parquet_read/mod.rs (99%) rename rust/src/{action => protocol}/serde_path.rs (100%) rename rust/src/{ => protocol}/time_utils.rs (100%) rename rust/src/{delta_arrow.rs => schema/arrow_convert.rs} (100%) rename rust/src/{schema.rs => schema/mod.rs} (99%) rename rust/src/{ => schema}/partitions.rs (90%) rename rust/src/{ => table}/builder.rs (99%) rename rust/src/{delta_config.rs => table/config.rs} (99%) rename rust/src/{delta.rs => table/mod.rs} (92%) rename rust/src/{table_state.rs => table/state.rs} (91%) rename rust/src/{table_state_arrow.rs => table/state_arrow.rs} (99%) diff --git a/python/src/error.rs b/python/src/error.rs index 229f7c83dc..2422dfc441 100644 --- a/python/src/error.rs +++ b/python/src/error.rs @@ -1,5 +1,5 @@ use arrow_schema::ArrowError; -use deltalake::action::ProtocolError; +use deltalake::protocol::ProtocolError; use deltalake::{errors::DeltaTableError, ObjectStoreError}; use pyo3::exceptions::{ PyException, PyFileNotFoundError, PyIOError, PyNotImplementedError, PyValueError, diff --git a/python/src/lib.rs b/python/src/lib.rs index 8d7cdb486d..8115c1bb76 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -14,13 +14,9 @@ use std::time::{SystemTime, UNIX_EPOCH}; use arrow::pyarrow::PyArrowType; use chrono::{DateTime, Duration, FixedOffset, Utc}; -use deltalake::action::{ - self, Action, ColumnCountStat, ColumnValueStat, DeltaOperation, SaveMode, Stats, -}; use deltalake::arrow::compute::concat_batches; use deltalake::arrow::record_batch::RecordBatch; use deltalake::arrow::{self, datatypes::Schema as ArrowSchema}; -use deltalake::builder::DeltaTableBuilder; use deltalake::checkpoints::create_checkpoint; use deltalake::datafusion::prelude::SessionContext; use deltalake::delta_datafusion::DeltaDataChecker; @@ -30,6 +26,10 @@ use deltalake::operations::restore::RestoreBuilder; use deltalake::operations::transaction::commit; use deltalake::operations::vacuum::VacuumBuilder; use deltalake::partitions::PartitionFilter; +use deltalake::protocol::{ + self, Action, ColumnCountStat, ColumnValueStat, DeltaOperation, SaveMode, Stats, +}; +use deltalake::DeltaTableBuilder; use deltalake::{DeltaOps, Invariant, Schema}; use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError}; use pyo3::prelude::*; @@ -497,7 +497,7 @@ impl RawDeltaTable { let existing_schema = self._table.get_schema().map_err(PythonError::from)?; - let mut actions: Vec = add_actions + let mut actions: Vec = add_actions .iter() .map(|add| Action::add(add.into())) .collect(); @@ -515,7 +515,7 @@ impl RawDeltaTable { .map_err(PythonError::from)?; for old_add in add_actions { - let remove_action = Action::remove(action::Remove { + let remove_action = Action::remove(protocol::Remove { path: old_add.path.clone(), deletion_timestamp: Some(current_timestamp()), data_change: true, @@ -536,7 +536,7 @@ impl RawDeltaTable { .map_err(PythonError::from)? .clone(); metadata.schema = schema; - let metadata_action = action::MetaData::try_from(metadata) + let metadata_action = protocol::MetaData::try_from(metadata) .map_err(|_| PyValueError::new_err("Failed to reparse metadata"))?; actions.push(Action::metaData(metadata_action)); } @@ -795,9 +795,9 @@ pub struct PyAddAction { stats: Option, } -impl From<&PyAddAction> for action::Add { +impl From<&PyAddAction> for protocol::Add { fn from(action: &PyAddAction) -> Self { - action::Add { + protocol::Add { path: action.path.clone(), size: action.size, partition_values: action.partition_values.clone(), diff --git a/rust/benches/read_checkpoint.rs b/rust/benches/read_checkpoint.rs index 9824f15eb0..2ecbee661b 100644 --- a/rust/benches/read_checkpoint.rs +++ b/rust/benches/read_checkpoint.rs @@ -1,6 +1,6 @@ use criterion::{criterion_group, criterion_main, Criterion}; -use deltalake::delta::DeltaTableConfig; -use deltalake::table_state::DeltaTableState; +use deltalake::table::state::DeltaTableState; +use deltalake::DeltaTableConfig; use std::fs::File; use std::io::Read; diff --git a/rust/examples/basic_operations.rs b/rust/examples/basic_operations.rs index d95aadfb78..0732791b74 100644 --- a/rust/examples/basic_operations.rs +++ b/rust/examples/basic_operations.rs @@ -4,7 +4,7 @@ use arrow::{ record_batch::RecordBatch, }; use deltalake::operations::collect_sendable_stream; -use deltalake::{action::SaveMode, DeltaOps, SchemaDataType, SchemaField}; +use deltalake::{protocol::SaveMode, DeltaOps, SchemaDataType, SchemaField}; use parquet::{ basic::{Compression, ZstdLevel}, file::properties::WriterProperties, diff --git a/rust/src/data_catalog/storage/mod.rs b/rust/src/data_catalog/storage/mod.rs index 90d9db8fdb..f645d370c1 100644 --- a/rust/src/data_catalog/storage/mod.rs +++ b/rust/src/data_catalog/storage/mod.rs @@ -13,8 +13,9 @@ use futures::TryStreamExt; use object_store::ObjectStore; use crate::errors::DeltaResult; +use crate::open_table_with_storage_options; use crate::storage::config::{configure_store, StorageOptions}; -use crate::{ensure_table_uri, open_table_with_storage_options}; +use crate::table::builder::ensure_table_uri; const DELTA_LOG_FOLDER: &str = "_delta_log"; diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index e69c7abbfe..e542413cfd 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -67,11 +67,11 @@ use datafusion_proto::physical_plan::PhysicalExtensionCodec; use object_store::ObjectMeta; use url::Url; -use crate::action::{self, Add}; -use crate::builder::ensure_table_uri; use crate::errors::{DeltaResult, DeltaTableError}; +use crate::protocol::{self, Add}; use crate::storage::ObjectStoreRef; -use crate::table_state::DeltaTableState; +use crate::table::builder::ensure_table_uri; +use crate::table::state::DeltaTableState; use crate::{open_table, open_table_with_storage_options, DeltaTable, Invariant, SchemaDataType}; const PATH_COLUMN: &str = "__delta_rs_path"; @@ -124,7 +124,7 @@ impl DeltaTableState { |acc, action| { let new_stats = action .get_stats() - .unwrap_or_else(|_| Some(action::Stats::default()))?; + .unwrap_or_else(|_| Some(protocol::Stats::default()))?; Some(Statistics { num_rows: acc .num_rows @@ -631,7 +631,7 @@ pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult PartitionedFile { let partition_values = schema @@ -1541,7 +1541,7 @@ mod tests { let mut partition_values = std::collections::HashMap::new(); partition_values.insert("month".to_string(), Some("1".to_string())); partition_values.insert("year".to_string(), Some("2015".to_string())); - let action = action::Add { + let action = protocol::Add { path: "year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string(), size: 10644, partition_values, diff --git a/rust/src/errors.rs b/rust/src/errors.rs index fed0e823f8..24989b2814 100644 --- a/rust/src/errors.rs +++ b/rust/src/errors.rs @@ -1,8 +1,8 @@ //! Exceptions for the deltalake crate use object_store::Error as ObjectStoreError; -use crate::action::ProtocolError; use crate::operations::transaction::TransactionError; +use crate::protocol::ProtocolError; /// A result returned by delta-rs pub type DeltaResult = Result; diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 2d03651cb9..af692fd5c9 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -82,42 +82,34 @@ compile_error!( "Features s3 and s3-native-tls are mutually exclusive and cannot be enabled together" ); -pub mod action; -pub mod builder; pub mod data_catalog; -pub mod delta; -pub mod delta_config; pub mod errors; pub mod operations; -pub mod partitions; +pub mod protocol; pub mod schema; pub mod storage; -pub mod table_state; -pub mod time_utils; +pub mod table; -#[cfg(feature = "arrow")] -pub mod table_state_arrow; - -#[cfg(all(feature = "arrow", feature = "parquet"))] -pub mod delta_arrow; #[cfg(feature = "datafusion")] pub mod delta_datafusion; #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod writer; -pub use self::builder::*; +use std::collections::HashMap; + pub use self::data_catalog::{get_data_catalog, DataCatalog, DataCatalogError}; -pub use self::delta::*; -pub use self::delta_config::*; -pub use self::partitions::*; +pub use self::errors::*; +pub use self::schema::partitions::*; pub use self::schema::*; -pub use errors::*; +pub use self::table::builder::{ + DeltaTableBuilder, DeltaTableConfig, DeltaTableLoadOptions, DeltaVersion, +}; +pub use self::table::config::DeltaConfigKey; +pub use self::table::DeltaTable; pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore}; pub use operations::DeltaOps; // convenience exports for consumers to avoid aligning crate versions -#[cfg(all(feature = "arrow", feature = "parquet"))] -pub use action::checkpoints; #[cfg(feature = "arrow")] pub use arrow; #[cfg(feature = "datafusion")] @@ -126,15 +118,70 @@ pub use datafusion; pub use parquet; #[cfg(feature = "parquet2")] pub use parquet2; +#[cfg(all(feature = "arrow", feature = "parquet"))] +pub use protocol::checkpoints; // needed only for integration tests // TODO can / should we move this into the test crate? #[cfg(feature = "integration_test")] pub mod test_utils; +/// Creates and loads a DeltaTable from the given path with current metadata. +/// Infers the storage backend to use from the scheme in the given table path. +pub async fn open_table(table_uri: impl AsRef) -> Result { + let table = DeltaTableBuilder::from_uri(table_uri).load().await?; + Ok(table) +} + +/// Same as `open_table`, but also accepts storage options to aid in building the table for a deduced +/// `StorageService`. +pub async fn open_table_with_storage_options( + table_uri: impl AsRef, + storage_options: HashMap, +) -> Result { + let table = DeltaTableBuilder::from_uri(table_uri) + .with_storage_options(storage_options) + .load() + .await?; + Ok(table) +} + +/// Creates a DeltaTable from the given path and loads it with the metadata from the given version. +/// Infers the storage backend to use from the scheme in the given table path. +pub async fn open_table_with_version( + table_uri: impl AsRef, + version: i64, +) -> Result { + let table = DeltaTableBuilder::from_uri(table_uri) + .with_version(version) + .load() + .await?; + Ok(table) +} + +/// Creates a DeltaTable from the given path. +/// Loads metadata from the version appropriate based on the given ISO-8601/RFC-3339 timestamp. +/// Infers the storage backend to use from the scheme in the given table path. +pub async fn open_table_with_ds( + table_uri: impl AsRef, + ds: impl AsRef, +) -> Result { + let table = DeltaTableBuilder::from_uri(table_uri) + .with_datestring(ds)? + .load() + .await?; + Ok(table) +} + +/// Returns rust crate version, can be use used in language bindings to expose Rust core version +pub fn crate_version() -> &'static str { + env!("CARGO_PKG_VERSION") +} + #[cfg(test)] mod tests { use super::*; + use crate::table::PeekCommit; use std::collections::HashMap; #[tokio::test] @@ -153,7 +200,7 @@ mod tests { ); let tombstones = table.get_state().all_tombstones(); assert_eq!(tombstones.len(), 4); - assert!(tombstones.contains(&crate::action::Remove { + assert!(tombstones.contains(&crate::protocol::Remove { path: "part-00000-512e1537-8aaa-4193-b8b4-bef3de0de409-c000.snappy.parquet".to_string(), deletion_timestamp: Some(1564524298213), data_change: false, @@ -255,7 +302,7 @@ mod tests { ); let tombstones = table.get_state().all_tombstones(); assert_eq!(tombstones.len(), 1); - assert!(tombstones.contains(&crate::action::Remove { + assert!(tombstones.contains(&crate::protocol::Remove { path: "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet".to_string(), deletion_timestamp: Some(1615043776198), data_change: true, @@ -475,7 +522,10 @@ mod tests { let table_from_struct_stats = crate::open_table(table_uri).await.unwrap(); let table_from_json_stats = crate::open_table_with_version(table_uri, 1).await.unwrap(); - fn get_stats_for_file(table: &crate::DeltaTable, file_name: &str) -> crate::action::Stats { + fn get_stats_for_file( + table: &crate::DeltaTable, + file_name: &str, + ) -> crate::protocol::Stats { table .get_file_uris() .zip(table.get_stats()) diff --git a/rust/src/operations/create.rs b/rust/src/operations/create.rs index 697ab3ef1d..694d95a74e 100644 --- a/rust/src/operations/create.rs +++ b/rust/src/operations/create.rs @@ -9,13 +9,14 @@ use serde_json::{Map, Value}; use super::transaction::commit; use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION}; -use crate::action::{Action, DeltaOperation, MetaData, Protocol, SaveMode}; -use crate::builder::ensure_table_uri; -use crate::delta_config::DeltaConfigKey; use crate::errors::{DeltaResult, DeltaTableError}; +use crate::protocol::{Action, DeltaOperation, MetaData, Protocol, SaveMode}; use crate::schema::{SchemaDataType, SchemaField, SchemaTypeStruct}; use crate::storage::DeltaObjectStore; -use crate::{DeltaTable, DeltaTableBuilder, DeltaTableMetaData}; +use crate::table::builder::ensure_table_uri; +use crate::table::config::DeltaConfigKey; +use crate::table::DeltaTableMetaData; +use crate::{DeltaTable, DeltaTableBuilder}; #[derive(thiserror::Error, Debug)] enum CreateError { @@ -148,7 +149,7 @@ impl CreateBuilder { /// /// Options may be passed in the HashMap or set as environment variables. /// - /// [crate::builder::s3_storage_options] describes the available options for the AWS or S3-compliant backend. + /// [crate::table::builder::s3_storage_options] describes the available options for the AWS or S3-compliant backend. /// [dynamodb_lock::DynamoDbLockClient] describes additional options for the AWS atomic rename client. /// /// If an object store is also passed using `with_object_store()` these options will be ignored. @@ -322,8 +323,8 @@ impl std::future::IntoFuture for CreateBuilder { #[cfg(all(test, feature = "parquet"))] mod tests { use super::*; - use crate::delta_config::DeltaConfigKey; use crate::operations::DeltaOps; + use crate::table::config::DeltaConfigKey; use crate::writer::test_utils::get_delta_schema; use tempdir::TempDir; diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index 39b98cce24..d7f908680d 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use std::time::{Instant, SystemTime, UNIX_EPOCH}; -use crate::action::{Action, Add, Remove}; +use crate::protocol::{Action, Add, Remove}; use datafusion::execution::context::{SessionContext, SessionState}; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_plan::filter::FilterExec; @@ -33,13 +33,14 @@ use parquet::file::properties::WriterProperties; use serde_json::Map; use serde_json::Value; -use crate::action::DeltaOperation; -use crate::delta_datafusion::{find_files, parquet_scan_from_actions, register_store}; +use crate::delta_datafusion::find_files; +use crate::delta_datafusion::{parquet_scan_from_actions, register_store}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::operations::transaction::commit; use crate::operations::write::write_execution_plan; +use crate::protocol::DeltaOperation; use crate::storage::{DeltaObjectStore, ObjectStoreRef}; -use crate::table_state::DeltaTableState; +use crate::table::state::DeltaTableState; use crate::DeltaTable; use super::datafusion_utils::Expression; @@ -324,8 +325,8 @@ impl std::future::IntoFuture for DeleteBuilder { #[cfg(test)] mod tests { - use crate::action::*; use crate::operations::DeltaOps; + use crate::protocol::*; use crate::writer::test_utils::datafusion::get_data; use crate::writer::test_utils::{get_arrow_schema, get_delta_schema}; use crate::DeltaTable; diff --git a/rust/src/operations/filesystem_check.rs b/rust/src/operations/filesystem_check.rs index 6cdf5390bf..bf047c45c4 100644 --- a/rust/src/operations/filesystem_check.rs +++ b/rust/src/operations/filesystem_check.rs @@ -24,11 +24,11 @@ pub use object_store::path::Path; use object_store::ObjectStore; use url::{ParseError, Url}; -use crate::action::{Action, Add, DeltaOperation, Remove}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::operations::transaction::commit; +use crate::protocol::{Action, Add, DeltaOperation, Remove}; use crate::storage::DeltaObjectStore; -use crate::table_state::DeltaTableState; +use crate::table::state::DeltaTableState; use crate::DeltaTable; /// Audit the Delta Table's active files with the underlying file system. diff --git a/rust/src/operations/load.rs b/rust/src/operations/load.rs index 9501c18011..7baa59e3e1 100644 --- a/rust/src/operations/load.rs +++ b/rust/src/operations/load.rs @@ -8,7 +8,7 @@ use futures::future::BoxFuture; use crate::errors::{DeltaResult, DeltaTableError}; use crate::storage::DeltaObjectStore; -use crate::table_state::DeltaTableState; +use crate::table::state::DeltaTableState; use crate::DeltaTable; #[derive(Debug, Clone)] diff --git a/rust/src/operations/merge.rs b/rust/src/operations/merge.rs index 29c5ffaa27..d088fbd3b7 100644 --- a/rust/src/operations/merge.rs +++ b/rust/src/operations/merge.rs @@ -32,6 +32,10 @@ //! .await? //! ```` +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; + use arrow_schema::SchemaRef; use datafusion::error::Result as DataFusionResult; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -55,27 +59,16 @@ use datafusion_physical_expr::{create_physical_expr, expressions, PhysicalExpr}; use futures::future::BoxFuture; use parquet::file::properties::WriterProperties; use serde_json::{Map, Value}; -use std::{ - collections::HashMap, - sync::Arc, - time::{Instant, SystemTime, UNIX_EPOCH}, -}; -use crate::action::MergePredicate; +use super::datafusion_utils::{into_expr, maybe_into_expr, Expression}; +use super::transaction::commit; +use crate::delta_datafusion::{parquet_scan_from_actions, register_store}; use crate::operations::datafusion_utils::MetricObserverExec; -use crate::{ - action::{Action, DeltaOperation, Remove}, - delta_datafusion::{parquet_scan_from_actions, register_store}, - operations::write::write_execution_plan, - storage::{DeltaObjectStore, ObjectStoreRef}, - table_state::DeltaTableState, - DeltaResult, DeltaTable, DeltaTableError, -}; - -use super::{ - datafusion_utils::{into_expr, maybe_into_expr, Expression}, - transaction::commit, -}; +use crate::operations::write::write_execution_plan; +use crate::protocol::{Action, DeltaOperation, MergePredicate, Remove}; +use crate::storage::{DeltaObjectStore, ObjectStoreRef}; +use crate::table::state::DeltaTableState; +use crate::{DeltaResult, DeltaTable, DeltaTableError}; const OPERATION_COLUMN: &str = "__delta_rs_operation"; const DELETE_COLUMN: &str = "__delta_rs_delete"; @@ -1105,8 +1098,8 @@ impl std::future::IntoFuture for MergeBuilder { #[cfg(test)] mod tests { - use crate::action::*; use crate::operations::DeltaOps; + use crate::protocol::*; use crate::writer::test_utils::datafusion::get_data; use crate::writer::test_utils::get_arrow_schema; use crate::writer::test_utils::get_delta_schema; diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index 28dea06777..7b6cb27ace 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -10,8 +10,8 @@ use self::create::CreateBuilder; use self::filesystem_check::FileSystemCheckBuilder; use self::vacuum::VacuumBuilder; -use crate::builder::DeltaTableBuilder; use crate::errors::{DeltaResult, DeltaTableError}; +use crate::table::builder::DeltaTableBuilder; use crate::DeltaTable; pub mod create; @@ -213,7 +213,7 @@ mod datafusion_utils { use datafusion_expr::Expr; use futures::{Stream, StreamExt}; - use crate::{table_state::DeltaTableState, DeltaResult}; + use crate::{table::state::DeltaTableState, DeltaResult}; /// Used to represent user input of either a Datafusion expression or string expression pub enum Expression { diff --git a/rust/src/operations/optimize.rs b/rust/src/operations/optimize.rs index c26150f9f3..eafc768519 100644 --- a/rust/src/operations/optimize.rs +++ b/rust/src/operations/optimize.rs @@ -41,10 +41,10 @@ use serde_json::Map; use super::transaction::commit; use super::writer::{PartitionWriter, PartitionWriterConfig}; -use crate::action::{self, Action, DeltaOperation}; use crate::errors::{DeltaResult, DeltaTableError}; +use crate::protocol::{self, Action, DeltaOperation}; use crate::storage::ObjectStoreRef; -use crate::table_state::DeltaTableState; +use crate::table::state::DeltaTableState; use crate::writer::utils::arrow_schema_without_partitions; use crate::{crate_version, DeltaTable, ObjectMeta, PartitionFilter}; @@ -311,7 +311,7 @@ fn create_remove( let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); let deletion_time = deletion_time.as_millis() as i64; - Ok(Action::remove(action::Remove { + Ok(Action::remove(protocol::Remove { path: path.to_string(), deletion_timestamp: Some(deletion_time), data_change: false, diff --git a/rust/src/operations/restore.rs b/rust/src/operations/restore.rs index 5800edd96f..c450554fea 100644 --- a/rust/src/operations/restore.rs +++ b/rust/src/operations/restore.rs @@ -30,11 +30,11 @@ use object_store::path::Path; use object_store::ObjectStore; use serde::Serialize; -use crate::action::{Action, Add, DeltaOperation, Remove}; use crate::operations::transaction::{prepare_commit, try_commit_transaction, TransactionError}; +use crate::protocol::{Action, Add, DeltaOperation, Protocol, Remove}; use crate::storage::ObjectStoreRef; -use crate::table_state::DeltaTableState; -use crate::{action, DeltaResult, DeltaTable, DeltaTableConfig, DeltaTableError, ObjectStoreError}; +use crate::table::state::DeltaTableState; +use crate::{DeltaResult, DeltaTable, DeltaTableConfig, DeltaTableError, ObjectStoreError}; /// Errors that can occur during restore #[derive(thiserror::Error, Debug)] @@ -202,12 +202,12 @@ async fn execute( let mut actions = vec![]; let protocol = if protocol_downgrade_allowed { - action::Protocol { + Protocol { min_reader_version: table.get_min_reader_version(), min_writer_version: table.get_min_writer_version(), } } else { - action::Protocol { + Protocol { min_reader_version: max( table.get_min_reader_version(), snapshot.min_reader_version(), @@ -249,7 +249,7 @@ async fn execute( async fn check_files_available( object_store: &dyn ObjectStore, - files: &Vec, + files: &Vec, ) -> DeltaResult<()> { for file in files { let file_path = Path::parse(file.path.clone())?; diff --git a/rust/src/operations/transaction/conflict_checker.rs b/rust/src/operations/transaction/conflict_checker.rs index 1058400787..d75e401def 100644 --- a/rust/src/operations/transaction/conflict_checker.rs +++ b/rust/src/operations/transaction/conflict_checker.rs @@ -5,11 +5,11 @@ use std::io::{BufRead, BufReader, Cursor}; use object_store::ObjectStore; use super::CommitInfo; -use crate::action::{Action, Add, DeltaOperation, MetaData, Protocol, Remove}; -use crate::delta_config::IsolationLevel; use crate::errors::{DeltaResult, DeltaTableError}; +use crate::protocol::{Action, Add, DeltaOperation, MetaData, Protocol, Remove}; use crate::storage::commit_uri_from_version; -use crate::table_state::DeltaTableState; +use crate::table::config::IsolationLevel; +use crate::table::state::DeltaTableState; #[cfg(feature = "datafusion")] use super::state::AddContainer; @@ -53,8 +53,8 @@ pub enum CommitConflictError { /// you may need to upgrade your Delta Lake version. /// - When multiple writers are creating or replacing a table at the same time. /// - When multiple writers are writing to an empty path at the same time. - #[error("Protocol changed since last commit.")] - ProtocolChanged, + #[error("Protocol changed since last commit: {0}")] + ProtocolChanged(String), /// Error returned when the table requires an unsupported writer version #[error("Delta-rs does not support writer version {0}")] @@ -392,10 +392,18 @@ impl<'a> ConflictChecker<'a> { /// to read and write against the protocol set by the committed transaction. fn check_protocol_compatibility(&self) -> Result<(), CommitConflictError> { for p in self.winning_commit_summary.protocol() { - if self.txn_info.read_snapshot.min_reader_version() < p.min_reader_version - || self.txn_info.read_snapshot.min_writer_version() < p.min_writer_version - { - return Err(CommitConflictError::ProtocolChanged); + let (win_read, curr_read) = ( + p.min_reader_version, + self.txn_info.read_snapshot.min_reader_version(), + ); + let (win_write, curr_write) = ( + p.min_writer_version, + self.txn_info.read_snapshot.min_writer_version(), + ); + if curr_read < win_read || win_write < curr_write { + return Err(CommitConflictError::ProtocolChanged( + format!("reqired read/write {win_read}/{win_write}, current read/write {curr_read}/{curr_write}"), + )); }; } if !self.winning_commit_summary.protocol().is_empty() @@ -405,7 +413,9 @@ impl<'a> ConflictChecker<'a> { .iter() .any(|a| matches!(a, Action::protocol(_))) { - return Err(CommitConflictError::ProtocolChanged); + return Err(CommitConflictError::ProtocolChanged( + "protocol changed".into(), + )); }; Ok(()) } @@ -631,7 +641,7 @@ mod tests { use super::super::test_utils as tu; use super::super::test_utils::init_table_actions; use super::*; - use crate::action::Action; + use crate::protocol::Action; #[cfg(feature = "datafusion")] use datafusion_expr::{col, lit}; use serde_json::json; @@ -818,7 +828,10 @@ mod tests { vec![tu::create_protocol_action(None, None)], false, ); - assert!(matches!(result, Err(CommitConflictError::ProtocolChanged))); + assert!(matches!( + result, + Err(CommitConflictError::ProtocolChanged(_)) + )); // taint whole table // `read_whole_table` should disallow any concurrent change, even if the change diff --git a/rust/src/operations/transaction/mod.rs b/rust/src/operations/transaction/mod.rs index fe99af1749..cc26e75fc2 100644 --- a/rust/src/operations/transaction/mod.rs +++ b/rust/src/operations/transaction/mod.rs @@ -5,11 +5,11 @@ use object_store::path::Path; use object_store::{Error as ObjectStoreError, ObjectStore}; use serde_json::{Map, Value}; -use crate::action::{Action, CommitInfo, DeltaOperation}; use crate::crate_version; use crate::errors::{DeltaResult, DeltaTableError}; +use crate::protocol::{Action, CommitInfo, DeltaOperation}; use crate::storage::commit_uri_from_version; -use crate::table_state::DeltaTableState; +use crate::table::state::DeltaTableState; mod conflict_checker; #[cfg(feature = "datafusion")] diff --git a/rust/src/operations/transaction/state.rs b/rust/src/operations/transaction/state.rs index 98a20d8866..6fe1d65aee 100644 --- a/rust/src/operations/transaction/state.rs +++ b/rust/src/operations/transaction/state.rs @@ -19,12 +19,12 @@ use sqlparser::dialect::GenericDialect; use sqlparser::parser::Parser; use sqlparser::tokenizer::Tokenizer; -use crate::action::Add; use crate::delta_datafusion::{ get_null_of_arrow_type, logical_expr_to_physical_expr, to_correct_scalar_value, }; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::table_state::DeltaTableState; +use crate::protocol::Add; +use crate::table::state::DeltaTableState; impl DeltaTableState { /// Get the table schema as an [`ArrowSchemaRef`] diff --git a/rust/src/operations/transaction/test_utils.rs b/rust/src/operations/transaction/test_utils.rs index 92d981d5e1..cdd98f8d1f 100644 --- a/rust/src/operations/transaction/test_utils.rs +++ b/rust/src/operations/transaction/test_utils.rs @@ -1,12 +1,12 @@ #![allow(unused)] -use super::{prepare_commit, try_commit_transaction, CommitInfo}; -use crate::action::{Action, Add, DeltaOperation, MetaData, Protocol, Remove, SaveMode}; -use crate::table_state::DeltaTableState; -use crate::{ - DeltaTable, DeltaTableBuilder, DeltaTableMetaData, Schema, SchemaDataType, SchemaField, -}; use std::collections::HashMap; +use super::{prepare_commit, try_commit_transaction, CommitInfo}; +use crate::protocol::{Action, Add, DeltaOperation, MetaData, Protocol, Remove, SaveMode}; +use crate::table::state::DeltaTableState; +use crate::table::DeltaTableMetaData; +use crate::{DeltaTable, DeltaTableBuilder, Schema, SchemaDataType, SchemaField}; + pub fn create_add_action( path: impl Into, data_change: bool, diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index a104732a8f..b030bc5644 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -43,10 +43,10 @@ use parquet::file::properties::WriterProperties; use serde_json::{Map, Value}; use crate::{ - action::{Action, DeltaOperation, Remove}, delta_datafusion::{find_files, parquet_scan_from_actions, register_store}, + protocol::{Action, DeltaOperation, Remove}, storage::{DeltaObjectStore, ObjectStoreRef}, - table_state::DeltaTableState, + table::state::DeltaTableState, DeltaResult, DeltaTable, DeltaTableError, }; @@ -475,7 +475,7 @@ mod tests { use crate::writer::test_utils::datafusion::get_data; use crate::writer::test_utils::{get_arrow_schema, get_delta_schema}; use crate::DeltaTable; - use crate::{action::*, DeltaResult}; + use crate::{protocol::SaveMode, DeltaResult}; use arrow::datatypes::{Field, Schema}; use arrow::record_batch::RecordBatch; use arrow_array::Int32Array; diff --git a/rust/src/operations/vacuum.rs b/rust/src/operations/vacuum.rs index 40e7657edf..ccaa22a33e 100644 --- a/rust/src/operations/vacuum.rs +++ b/rust/src/operations/vacuum.rs @@ -33,7 +33,7 @@ use object_store::{path::Path, ObjectStore}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::storage::DeltaObjectStore; -use crate::table_state::DeltaTableState; +use crate::table::state::DeltaTableState; use crate::DeltaTable; /// Errors that can occur during vacuum diff --git a/rust/src/operations/write.rs b/rust/src/operations/write.rs index ca96134935..861e9a9dff 100644 --- a/rust/src/operations/write.rs +++ b/rust/src/operations/write.rs @@ -32,15 +32,15 @@ use serde_json::Map; use super::writer::{DeltaWriter, WriterConfig}; use super::MAX_SUPPORTED_WRITER_VERSION; use super::{transaction::commit, CreateBuilder}; -use crate::action::{Action, Add, DeltaOperation, Remove, SaveMode}; -use crate::delta::DeltaTable; use crate::delta_datafusion::DeltaDataChecker; use crate::errors::{DeltaResult, DeltaTableError}; +use crate::protocol::{Action, Add, DeltaOperation, Remove, SaveMode}; use crate::schema::Schema; use crate::storage::{DeltaObjectStore, ObjectStoreRef}; -use crate::table_state::DeltaTableState; +use crate::table::state::DeltaTableState; use crate::writer::record_batch::divide_by_partition_values; use crate::writer::utils::PartitionPath; +use crate::DeltaTable; #[derive(thiserror::Error, Debug)] enum WriteError { @@ -552,8 +552,8 @@ fn cast_record_batch( #[cfg(test)] mod tests { use super::*; - use crate::action::SaveMode; use crate::operations::{collect_sendable_stream, DeltaOps}; + use crate::protocol::SaveMode; use crate::writer::test_utils::datafusion::get_data; use crate::writer::test_utils::{ get_delta_schema, get_delta_schema_with_nested_struct, get_record_batch, diff --git a/rust/src/operations/writer.rs b/rust/src/operations/writer.rs index a72b832505..05bda44ae6 100644 --- a/rust/src/operations/writer.rs +++ b/rust/src/operations/writer.rs @@ -2,16 +2,6 @@ use std::collections::HashMap; -use crate::action::Add; -use crate::storage::ObjectStoreRef; -use crate::writer::record_batch::{divide_by_partition_values, PartitionResult}; -use crate::writer::stats::create_add; -use crate::writer::utils::{ - self, arrow_schema_without_partitions, record_batch_without_partitions, PartitionPath, - ShareableBuffer, -}; -use crate::{crate_version, DeltaResult, DeltaTableError}; - use arrow::datatypes::SchemaRef as ArrowSchemaRef; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; @@ -21,6 +11,17 @@ use parquet::arrow::ArrowWriter; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; +use crate::crate_version; +use crate::errors::{DeltaResult, DeltaTableError}; +use crate::protocol::Add; +use crate::storage::ObjectStoreRef; +use crate::writer::record_batch::{divide_by_partition_values, PartitionResult}; +use crate::writer::stats::create_add; +use crate::writer::utils::{ + arrow_schema_without_partitions, next_data_path, record_batch_without_partitions, + PartitionPath, ShareableBuffer, +}; + // TODO databricks often suggests a file size of 100mb, should we set this default? const DEFAULT_TARGET_FILE_SIZE: usize = 104_857_600; const DEFAULT_WRITE_BATCH_SIZE: usize = 1024; @@ -304,7 +305,7 @@ impl PartitionWriter { fn next_data_path(&mut self) -> Path { self.part_counter += 1; - utils::next_data_path( + next_data_path( &self.config.prefix, self.part_counter, &self.writer_id, diff --git a/rust/src/action/checkpoints.rs b/rust/src/protocol/checkpoints.rs similarity index 98% rename from rust/src/action/checkpoints.rs rename to rust/src/protocol/checkpoints.rs index 86dbc7bc87..3bf2eb962e 100644 --- a/rust/src/action/checkpoints.rs +++ b/rust/src/protocol/checkpoints.rs @@ -19,12 +19,13 @@ use parquet::errors::ParquetError; use regex::Regex; use serde_json::Value; -use super::{Action, Add as AddAction, MetaData, Protocol, ProtocolError, Txn}; -use crate::delta_arrow::delta_log_schema_for_table; +use super::{time_utils, Action, Add as AddAction, MetaData, Protocol, ProtocolError, Txn}; +use crate::arrow_convert::delta_log_schema_for_table; +use crate::schema::*; use crate::storage::DeltaObjectStore; -use crate::table_state::DeltaTableState; -use crate::{open_table_with_version, time_utils, CheckPoint, DeltaTable}; -use crate::{schema::*, CheckPointBuilder}; +use crate::table::state::DeltaTableState; +use crate::table::{CheckPoint, CheckPointBuilder}; +use crate::{open_table_with_version, DeltaTable}; type SchemaPath = Vec; diff --git a/rust/src/action/mod.rs b/rust/src/protocol/mod.rs similarity index 99% rename from rust/src/action/mod.rs rename to rust/src/protocol/mod.rs index 105ebdcdab..b03ebe4a9a 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/protocol/mod.rs @@ -9,6 +9,7 @@ pub mod parquet2_read; #[cfg(feature = "parquet")] mod parquet_read; mod serde_path; +mod time_utils; #[cfg(feature = "arrow")] use arrow_schema::ArrowError; @@ -25,10 +26,11 @@ use std::hash::{Hash, Hasher}; use std::mem::take; use std::str::FromStr; -use crate::delta_config::IsolationLevel; use crate::errors::DeltaResult; use crate::storage::ObjectStoreRef; -use crate::{delta::CheckPoint, schema::*, DeltaTableMetaData}; +use crate::table::config::IsolationLevel; +use crate::table::DeltaTableMetaData; +use crate::{schema::*, table::CheckPoint}; /// Error returned when an invalid Delta log action is encountered. #[allow(missing_docs)] diff --git a/rust/src/action/parquet2_read/boolean.rs b/rust/src/protocol/parquet2_read/boolean.rs similarity index 98% rename from rust/src/action/parquet2_read/boolean.rs rename to rust/src/protocol/parquet2_read/boolean.rs index 553ba36709..474a61a153 100644 --- a/rust/src/action/parquet2_read/boolean.rs +++ b/rust/src/protocol/parquet2_read/boolean.rs @@ -4,7 +4,7 @@ use parquet2::page::DataPage; use super::validity::ValidityRowIndexIter; use super::{split_page, ActionVariant, ParseError}; -use crate::action::Action; +use crate::protocol::Action; /// Parquet dictionary primitive value reader pub struct SomeBooleanValueIter<'a> { diff --git a/rust/src/action/parquet2_read/dictionary/binary.rs b/rust/src/protocol/parquet2_read/dictionary/binary.rs similarity index 100% rename from rust/src/action/parquet2_read/dictionary/binary.rs rename to rust/src/protocol/parquet2_read/dictionary/binary.rs diff --git a/rust/src/action/parquet2_read/dictionary/mod.rs b/rust/src/protocol/parquet2_read/dictionary/mod.rs similarity index 100% rename from rust/src/action/parquet2_read/dictionary/mod.rs rename to rust/src/protocol/parquet2_read/dictionary/mod.rs diff --git a/rust/src/action/parquet2_read/dictionary/primitive.rs b/rust/src/protocol/parquet2_read/dictionary/primitive.rs similarity index 100% rename from rust/src/action/parquet2_read/dictionary/primitive.rs rename to rust/src/protocol/parquet2_read/dictionary/primitive.rs diff --git a/rust/src/action/parquet2_read/map.rs b/rust/src/protocol/parquet2_read/map.rs similarity index 99% rename from rust/src/action/parquet2_read/map.rs rename to rust/src/protocol/parquet2_read/map.rs index ed730e383b..0739feae2d 100644 --- a/rust/src/action/parquet2_read/map.rs +++ b/rust/src/protocol/parquet2_read/map.rs @@ -3,7 +3,7 @@ use parquet2::page::{DataPage, DictPage}; use super::string::for_each_repeated_string_field_value_with_idx; use super::{ActionVariant, ParseError}; -use crate::action::Action; +use crate::protocol::Action; #[derive(Default)] pub struct MapState { diff --git a/rust/src/action/parquet2_read/mod.rs b/rust/src/protocol/parquet2_read/mod.rs similarity index 99% rename from rust/src/action/parquet2_read/mod.rs rename to rust/src/protocol/parquet2_read/mod.rs index afa6065279..28908fe6bd 100644 --- a/rust/src/action/parquet2_read/mod.rs +++ b/rust/src/protocol/parquet2_read/mod.rs @@ -11,7 +11,7 @@ use parquet2::read::get_page_iterator; use parquet2::read::levels::get_bit_width; use super::ProtocolError; -use crate::action::{Action, Add, CommitInfo, MetaData, Protocol, Remove, Txn}; +use crate::protocol::{Action, Add, CommitInfo, MetaData, Protocol, Remove, Txn}; use crate::schema::Guid; use boolean::for_each_boolean_field_value; use map::for_each_map_field_value; @@ -751,7 +751,7 @@ mod tests { assert_eq!(meta_data.description, None); assert_eq!( meta_data.format, - crate::action::Format::new("parquet".to_string(), None), + crate::protocol::Format::new("parquet".to_string(), None), ); assert_eq!(meta_data.schema_string, "{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}"); assert_eq!(meta_data.partition_columns.len(), 0); @@ -826,7 +826,7 @@ mod tests { assert_eq!(meta_data.description, None); assert_eq!( meta_data.format, - crate::action::Format::new("parquet".to_string(), None), + crate::protocol::Format::new("parquet".to_string(), None), ); assert_eq!( meta_data.schema_string, diff --git a/rust/src/action/parquet2_read/primitive.rs b/rust/src/protocol/parquet2_read/primitive.rs similarity index 99% rename from rust/src/action/parquet2_read/primitive.rs rename to rust/src/protocol/parquet2_read/primitive.rs index 4f262f7aaa..29147ea8ca 100644 --- a/rust/src/action/parquet2_read/primitive.rs +++ b/rust/src/protocol/parquet2_read/primitive.rs @@ -12,7 +12,7 @@ use parquet2::types::NativeType; use super::dictionary; use super::validity::ValidityRowIndexIter; use super::{split_page, ActionVariant, ParseError}; -use crate::action::Action; +use crate::protocol::Action; struct ExactChunksIter<'a, T: NativeType> { chunks: std::slice::ChunksExact<'a, u8>, diff --git a/rust/src/action/parquet2_read/stats.rs b/rust/src/protocol/parquet2_read/stats.rs similarity index 85% rename from rust/src/action/parquet2_read/stats.rs rename to rust/src/protocol/parquet2_read/stats.rs index c9bb2f9bdc..689dfea6c0 100644 --- a/rust/src/action/parquet2_read/stats.rs +++ b/rust/src/protocol/parquet2_read/stats.rs @@ -1,4 +1,4 @@ -use crate::action::{Add, ProtocolError, Stats}; +use crate::protocol::{Add, ProtocolError, Stats}; impl Add { /// Returns the composite HashMap representation of stats contained in the action if present. diff --git a/rust/src/action/parquet2_read/string.rs b/rust/src/protocol/parquet2_read/string.rs similarity index 99% rename from rust/src/action/parquet2_read/string.rs rename to rust/src/protocol/parquet2_read/string.rs index 1a851aec3b..fc0ec574e0 100644 --- a/rust/src/action/parquet2_read/string.rs +++ b/rust/src/protocol/parquet2_read/string.rs @@ -9,7 +9,7 @@ use super::dictionary; use super::dictionary::binary::BinaryPageDict; use super::validity::{ValidityRepeatedRowIndexIter, ValidityRowIndexIter}; use super::{split_page, split_page_nested, ActionVariant, ParseError}; -use crate::action::Action; +use crate::protocol::Action; pub trait StringValueIter<'a>: Iterator> { fn try_from_encoded_values( diff --git a/rust/src/action/parquet2_read/validity.rs b/rust/src/protocol/parquet2_read/validity.rs similarity index 100% rename from rust/src/action/parquet2_read/validity.rs rename to rust/src/protocol/parquet2_read/validity.rs diff --git a/rust/src/action/parquet_read/mod.rs b/rust/src/protocol/parquet_read/mod.rs similarity index 99% rename from rust/src/action/parquet_read/mod.rs rename to rust/src/protocol/parquet_read/mod.rs index 3ff6dfa710..93fdc4c2df 100644 --- a/rust/src/action/parquet_read/mod.rs +++ b/rust/src/protocol/parquet_read/mod.rs @@ -6,7 +6,7 @@ use num_traits::cast::ToPrimitive; use parquet::record::{Field, ListAccessor, MapAccessor, RowAccessor}; use serde_json::json; -use crate::action::{ +use crate::protocol::{ Action, Add, AddCDCFile, ColumnCountStat, ColumnValueStat, DeletionVector, MetaData, Protocol, ProtocolError, Remove, Stats, Txn, }; diff --git a/rust/src/action/serde_path.rs b/rust/src/protocol/serde_path.rs similarity index 100% rename from rust/src/action/serde_path.rs rename to rust/src/protocol/serde_path.rs diff --git a/rust/src/time_utils.rs b/rust/src/protocol/time_utils.rs similarity index 100% rename from rust/src/time_utils.rs rename to rust/src/protocol/time_utils.rs diff --git a/rust/src/delta_arrow.rs b/rust/src/schema/arrow_convert.rs similarity index 100% rename from rust/src/delta_arrow.rs rename to rust/src/schema/arrow_convert.rs diff --git a/rust/src/schema.rs b/rust/src/schema/mod.rs similarity index 99% rename from rust/src/schema.rs rename to rust/src/schema/mod.rs index 2602c5cd68..a853725fc6 100644 --- a/rust/src/schema.rs +++ b/rust/src/schema/mod.rs @@ -8,6 +8,10 @@ use std::collections::HashMap; use crate::errors::DeltaTableError; +#[cfg(all(feature = "arrow", feature = "parquet"))] +pub mod arrow_convert; +pub mod partitions; + /// Type alias for a string expected to match a GUID/UUID format pub type Guid = String; diff --git a/rust/src/partitions.rs b/rust/src/schema/partitions.rs similarity index 90% rename from rust/src/partitions.rs rename to rust/src/schema/partitions.rs index ed4a5a2eaf..0c1b0f6404 100644 --- a/rust/src/partitions.rs +++ b/rust/src/schema/partitions.rs @@ -2,7 +2,7 @@ use std::convert::TryFrom; -use super::schema::SchemaDataType; +use super::SchemaDataType; use crate::errors::DeltaTableError; use std::cmp::Ordering; use std::collections::HashMap; @@ -189,20 +189,18 @@ pub struct DeltaTablePartition<'a> { pub value: &'a str, } -/** - * Create a DeltaTable partition from a HivePartition string. - * - * A HivePartition string is represented by a "key=value" format. - * - * ```rust - * use deltalake::DeltaTablePartition; - * - * let hive_part = "ds=2023-01-01"; - * let partition = DeltaTablePartition::try_from(hive_part).unwrap(); - * assert_eq!("ds", partition.key); - * assert_eq!("2023-01-01", partition.value); - * ``` - */ +/// Create a DeltaTable partition from a HivePartition string. +/// +/// A HivePartition string is represented by a "key=value" format. +/// +/// ```rust +/// use deltalake::DeltaTablePartition; +/// +/// let hive_part = "ds=2023-01-01"; +/// let partition = DeltaTablePartition::try_from(hive_part).unwrap(); +/// assert_eq!("ds", partition.key); +/// assert_eq!("2023-01-01", partition.value); +/// ``` impl<'a> TryFrom<&'a str> for DeltaTablePartition<'a> { type Error = DeltaTableError; @@ -223,20 +221,18 @@ impl<'a> TryFrom<&'a str> for DeltaTablePartition<'a> { } impl<'a> DeltaTablePartition<'a> { - /** - * Try to create a DeltaTable partition from a partition value kv pair. - * - * ```rust - * use deltalake::DeltaTablePartition; - * - * let value = (&"ds".to_string(), &Some("2023-01-01".to_string())); - * let null_default = "1979-01-01"; - * let partition = DeltaTablePartition::from_partition_value(value, null_default); - * - * assert_eq!("ds", partition.key); - * assert_eq!("2023-01-01", partition.value); - * ``` - */ + /// Try to create a DeltaTable partition from a partition value kv pair. + /// + /// ```rust + /// use deltalake::DeltaTablePartition; + /// + /// let value = (&"ds".to_string(), &Some("2023-01-01".to_string())); + /// let null_default = "1979-01-01"; + /// let partition = DeltaTablePartition::from_partition_value(value, null_default); + /// + /// assert_eq!("ds", partition.key); + /// assert_eq!("2023-01-01", partition.value); + /// ``` pub fn from_partition_value( partition_value: (&'a String, &'a Option), default_for_null: &'a str, diff --git a/rust/src/storage/config.rs b/rust/src/storage/config.rs index 76c2e6dd81..1cba57b579 100644 --- a/rust/src/storage/config.rs +++ b/rust/src/storage/config.rs @@ -299,7 +299,7 @@ fn url_prefix_handler(store: T, prefix: Path) -> DeltaResult + Clone) -> DeltaResult { let mut options = options.into(); let storage = config::configure_store(&location, &mut options)?; diff --git a/rust/src/storage/s3.rs b/rust/src/storage/s3.rs index ec5e6a344a..2cc33f3ae3 100644 --- a/rust/src/storage/s3.rs +++ b/rust/src/storage/s3.rs @@ -1,7 +1,7 @@ //! AWS S3 storage backend. use super::utils::str_is_truthy; -use crate::builder::{s3_storage_options, str_option}; +use crate::table::builder::{s3_storage_options, str_option}; use bytes::Bytes; use dynamodb_lock::{DynamoError, LockClient, LockItem, DEFAULT_MAX_RETRY_ACQUIRE_LOCK_ATTEMPTS}; use futures::stream::BoxStream; diff --git a/rust/src/storage/utils.rs b/rust/src/storage/utils.rs index 7cb27d721a..80710efd9b 100644 --- a/rust/src/storage/utils.rs +++ b/rust/src/storage/utils.rs @@ -8,9 +8,9 @@ use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::{DynObjectStore, ObjectMeta, Result as ObjectStoreResult}; -use crate::action::Add; -use crate::builder::DeltaTableBuilder; use crate::errors::{DeltaResult, DeltaTableError}; +use crate::protocol::Add; +use crate::table::builder::DeltaTableBuilder; /// Copies the contents from the `from` location into the `to` location pub async fn copy_table( @@ -82,7 +82,7 @@ impl TryFrom<&Add> for ObjectMeta { fn try_from(value: &Add) -> DeltaResult { let last_modified = Utc.from_utc_datetime( &NaiveDateTime::from_timestamp_millis(value.modification_time).ok_or( - DeltaTableError::from(crate::action::ProtocolError::InvalidField(format!( + DeltaTableError::from(crate::protocol::ProtocolError::InvalidField(format!( "invalid modification_time: {:?}", value.modification_time ))), diff --git a/rust/src/builder.rs b/rust/src/table/builder.rs similarity index 99% rename from rust/src/builder.rs rename to rust/src/table/builder.rs index decfda8db0..92fc4851ad 100644 --- a/rust/src/builder.rs +++ b/rust/src/table/builder.rs @@ -9,7 +9,7 @@ use object_store::DynObjectStore; use serde::{Deserialize, Serialize}; use url::Url; -use crate::delta::DeltaTable; +use super::DeltaTable; use crate::errors::{DeltaResult, DeltaTableError}; use crate::storage::config::StorageOptions; use crate::storage::{DeltaObjectStore, ObjectStoreRef}; diff --git a/rust/src/delta_config.rs b/rust/src/table/config.rs similarity index 99% rename from rust/src/delta_config.rs rename to rust/src/table/config.rs index 1b7f6b7e0f..60498767ab 100644 --- a/rust/src/delta_config.rs +++ b/rust/src/table/config.rs @@ -387,7 +387,7 @@ fn parse_int(value: &str) -> Result { #[cfg(test)] mod tests { use super::*; - use crate::DeltaTableMetaData; + use crate::table::DeltaTableMetaData; use crate::Schema; use std::collections::HashMap; diff --git a/rust/src/delta.rs b/rust/src/table/mod.rs similarity index 92% rename from rust/src/delta.rs rename to rust/src/table/mod.rs index d877b77191..667b2239aa 100644 --- a/rust/src/delta.rs +++ b/rust/src/table/mod.rs @@ -1,7 +1,5 @@ //! Delta Table read and write implementation -// Reference: https://github.com/delta-io/delta/blob/master/PROTOCOL.md -// use std::collections::HashMap; use std::convert::TryFrom; use std::fmt; @@ -21,17 +19,20 @@ use serde::ser::SerializeSeq; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use uuid::Uuid; -use super::action; -use super::action::{find_latest_check_point_for_version, get_last_checkpoint, Action}; -use super::partitions::PartitionFilter; -use super::schema::*; -use super::table_state::DeltaTableState; -use crate::action::{Add, ProtocolError, Stats}; +use self::builder::DeltaTableConfig; +use self::state::DeltaTableState; use crate::errors::DeltaTableError; +use crate::partitions::PartitionFilter; +use crate::protocol::{self, find_latest_check_point_for_version, get_last_checkpoint, Action}; +use crate::protocol::{Add, ProtocolError, Stats}; +use crate::schema::*; use crate::storage::{commit_uri_from_version, ObjectStoreRef}; -// TODO re-exports only for transition -pub use crate::builder::{DeltaTableBuilder, DeltaTableConfig, DeltaVersion}; +pub mod builder; +pub mod config; +pub mod state; +#[cfg(feature = "arrow")] +pub mod state_arrow; /// Metadata for a checkpoint file #[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)] @@ -136,7 +137,7 @@ pub struct DeltaTableMetaData { /// User-provided description for this table pub description: Option, /// Specification of the encoding for the files stored in the table - pub format: action::Format, + pub format: protocol::Format, /// Schema of the table pub schema: Schema, /// An array containing the names of columns by which the data should be partitioned @@ -152,7 +153,7 @@ impl DeltaTableMetaData { pub fn new( name: Option, description: Option, - format: Option, + format: Option, schema: Schema, partition_columns: Vec, configuration: HashMap>, @@ -208,10 +209,10 @@ impl fmt::Display for DeltaTableMetaData { } } -impl TryFrom for DeltaTableMetaData { +impl TryFrom for DeltaTableMetaData { type Error = ProtocolError; - fn try_from(action_metadata: action::MetaData) -> Result { + fn try_from(action_metadata: protocol::MetaData) -> Result { let schema = action_metadata.get_schema()?; Ok(Self { id: action_metadata.id, @@ -663,7 +664,7 @@ impl DeltaTable { pub async fn history( &mut self, limit: Option, - ) -> Result, DeltaTableError> { + ) -> Result, DeltaTableError> { let mut version = match limit { Some(l) => max(self.version() - l as i64 + 1, 0), None => self.get_earliest_delta_log_version().await?, @@ -796,7 +797,7 @@ impl DeltaTable { } /// Returns a vector of active tombstones (i.e. `Remove` actions present in the current delta log). - pub fn get_tombstones(&self) -> impl Iterator { + pub fn get_tombstones(&self) -> impl Iterator { self.state.unexpired_tombstones() } @@ -907,64 +908,12 @@ impl std::fmt::Debug for DeltaTable { } } -/// Creates and loads a DeltaTable from the given path with current metadata. -/// Infers the storage backend to use from the scheme in the given table path. -pub async fn open_table(table_uri: impl AsRef) -> Result { - let table = DeltaTableBuilder::from_uri(table_uri).load().await?; - Ok(table) -} - -/// Same as `open_table`, but also accepts storage options to aid in building the table for a deduced -/// `StorageService`. -pub async fn open_table_with_storage_options( - table_uri: impl AsRef, - storage_options: HashMap, -) -> Result { - let table = DeltaTableBuilder::from_uri(table_uri) - .with_storage_options(storage_options) - .load() - .await?; - Ok(table) -} - -/// Creates a DeltaTable from the given path and loads it with the metadata from the given version. -/// Infers the storage backend to use from the scheme in the given table path. -pub async fn open_table_with_version( - table_uri: impl AsRef, - version: i64, -) -> Result { - let table = DeltaTableBuilder::from_uri(table_uri) - .with_version(version) - .load() - .await?; - Ok(table) -} - -/// Creates a DeltaTable from the given path. -/// Loads metadata from the version appropriate based on the given ISO-8601/RFC-3339 timestamp. -/// Infers the storage backend to use from the scheme in the given table path. -pub async fn open_table_with_ds( - table_uri: impl AsRef, - ds: impl AsRef, -) -> Result { - let table = DeltaTableBuilder::from_uri(table_uri) - .with_datestring(ds)? - .load() - .await?; - Ok(table) -} - -/// Returns rust crate version, can be use used in language bindings to expose Rust core version -pub fn crate_version() -> &'static str { - env!("CARGO_PKG_VERSION") -} - #[cfg(test)] mod tests { use super::*; - #[cfg(any(feature = "s3", feature = "s3-native-tls"))] - use crate::builder::DeltaTableBuilder; use crate::operations::create::CreateBuilder; + #[cfg(any(feature = "s3", feature = "s3-native-tls"))] + use crate::table::builder::DeltaTableBuilder; use pretty_assertions::assert_eq; use std::collections::HashMap; use tempdir::TempDir; diff --git a/rust/src/table_state.rs b/rust/src/table/state.rs similarity index 91% rename from rust/src/table_state.rs rename to rust/src/table/state.rs index 2ac17032d4..71aa6bddc9 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table/state.rs @@ -10,14 +10,15 @@ use lazy_static::lazy_static; use object_store::{path::Path, Error as ObjectStoreError, ObjectStore}; use serde::{Deserialize, Serialize}; -use crate::action::{self, Action, Add, ProtocolError}; -use crate::delta_config::TableConfig; +use super::config::TableConfig; use crate::errors::DeltaTableError; use crate::partitions::{DeltaTablePartition, PartitionFilter}; +use crate::protocol::{self, Action, Add, ProtocolError}; use crate::schema::SchemaDataType; use crate::storage::commit_uri_from_version; +use crate::table::DeltaTableMetaData; +use crate::DeltaTable; use crate::Schema; -use crate::{DeltaTable, DeltaTableMetaData}; #[cfg(any(feature = "parquet", feature = "parquet2"))] use super::{CheckPoint, DeltaTableConfig}; @@ -30,13 +31,13 @@ pub struct DeltaTableState { version: i64, // A remove action should remain in the state of the table as a tombstone until it has expired. // A tombstone expires when the creation timestamp of the delta file exceeds the expiration - tombstones: HashSet, + tombstones: HashSet, // active files for table state - files: Vec, + files: Vec, // Information added to individual commits - commit_infos: Vec, + commit_infos: Vec, // Domain metadatas provided by the system or user - domain_metadatas: Vec, + domain_metadatas: Vec, app_transaction_version: HashMap, min_reader_version: i32, min_writer_version: i32, @@ -75,7 +76,7 @@ impl DeltaTableState { let mut new_state = DeltaTableState::with_version(version); for line in reader.lines() { - let action: action::Action = serde_json::from_str(line?.as_str())?; + let action: protocol::Action = serde_json::from_str(line?.as_str())?; new_state.process_action( action, table.config.require_tombstones, @@ -109,13 +110,13 @@ impl DeltaTableState { let preader = SerializedFileReader::new(data)?; let schema = preader.metadata().file_metadata().schema(); if !schema.is_group() { - return Err(DeltaTableError::from(action::ProtocolError::Generic( + return Err(DeltaTableError::from(protocol::ProtocolError::Generic( "Action record in checkpoint should be a struct".to_string(), ))); } for record in preader.get_row_iter(None)? { self.process_action( - action::Action::from_parquet_record(schema, &record.unwrap())?, + protocol::Action::from_parquet_record(schema, &record.unwrap())?, table_config.require_tombstones, table_config.require_files, )?; @@ -124,7 +125,7 @@ impl DeltaTableState { #[cfg(feature = "parquet2")] { - use crate::action::parquet2_read::actions_from_row_group; + use crate::protocol::parquet2_read::actions_from_row_group; use parquet2::read::read_metadata; let mut reader = std::io::Cursor::new(data); @@ -132,7 +133,7 @@ impl DeltaTableState { for row_group in metadata.row_groups { for action in actions_from_row_group(row_group, &mut reader) - .map_err(action::ProtocolError::from)? + .map_err(protocol::ProtocolError::from)? { self.process_action( action, @@ -164,7 +165,7 @@ impl DeltaTableState { } /// List of commit info maps. - pub fn commit_infos(&self) -> &Vec { + pub fn commit_infos(&self) -> &Vec { &self.commit_infos } @@ -184,13 +185,13 @@ impl DeltaTableState { } /// Full list of tombstones (remove actions) representing files removed from table state). - pub fn all_tombstones(&self) -> &HashSet { + pub fn all_tombstones(&self) -> &HashSet { &self.tombstones } /// List of unexpired tombstones (remove actions) representing files removed from table state. /// The retention period is set by `deletedFileRetentionDuration` with default value of 1 week. - pub fn unexpired_tombstones(&self) -> impl Iterator { + pub fn unexpired_tombstones(&self) -> impl Iterator { let retention_timestamp = Utc::now().timestamp_millis() - self.tombstone_retention_millis; self.tombstones .iter() @@ -199,7 +200,7 @@ impl DeltaTableState { /// Full list of add actions representing all parquet files that are part of the current /// delta table state. - pub fn files(&self) -> &Vec { + pub fn files(&self) -> &Vec { self.files.as_ref() } @@ -318,28 +319,28 @@ impl DeltaTableState { /// Process given action by updating current state. fn process_action( &mut self, - action: action::Action, + action: protocol::Action, require_tombstones: bool, require_files: bool, ) -> Result<(), ProtocolError> { match action { // TODO: optionally load CDC into TableState - action::Action::cdc(_v) => {} - action::Action::add(v) => { + protocol::Action::cdc(_v) => {} + protocol::Action::add(v) => { if require_files { self.files.push(v); } } - action::Action::remove(v) => { + protocol::Action::remove(v) => { if require_tombstones && require_files { self.tombstones.insert(v); } } - action::Action::protocol(v) => { + protocol::Action::protocol(v) => { self.min_reader_version = v.min_reader_version; self.min_writer_version = v.min_writer_version; } - action::Action::metaData(v) => { + protocol::Action::metaData(v) => { let md = DeltaTableMetaData::try_from(v)?; let table_config = TableConfig(&md.configuration); self.tombstone_retention_millis = @@ -349,16 +350,16 @@ impl DeltaTableState { self.enable_expired_log_cleanup = table_config.enable_expired_log_cleanup(); self.current_metadata = Some(md); } - action::Action::txn(v) => { + protocol::Action::txn(v) => { *self .app_transaction_version .entry(v.app_id) .or_insert(v.version) = v.version; } - action::Action::commitInfo(v) => { + protocol::Action::commitInfo(v) => { self.commit_infos.push(v); } - action::Action::domainMetadata(v) => { + protocol::Action::domainMetadata(v) => { self.domain_metadatas.push(v); } } @@ -451,7 +452,7 @@ mod tests { enable_expired_log_cleanup: true, }; - let txn_action = action::Action::txn(action::Txn { + let txn_action = protocol::Action::txn(protocol::Txn { app_id: "abc".to_string(), version: 2, last_updated: Some(0), diff --git a/rust/src/table_state_arrow.rs b/rust/src/table/state_arrow.rs similarity index 99% rename from rust/src/table_state_arrow.rs rename to rust/src/table/state_arrow.rs index d4765d48d8..34f858f415 100644 --- a/rust/src/table_state_arrow.rs +++ b/rust/src/table/state_arrow.rs @@ -1,6 +1,6 @@ //! Methods to get Delta Table state in Arrow structures //! -//! See [crate::table_state::DeltaTableState]. +//! See [crate::table::DeltaTableState]. use std::borrow::Cow; use std::collections::{HashMap, HashSet, VecDeque}; @@ -16,9 +16,9 @@ use arrow_array::{ use arrow_schema::{DataType, Field, Fields, TimeUnit}; use itertools::Itertools; -use crate::action::{ColumnCountStat, ColumnValueStat, Stats}; +use super::state::DeltaTableState; use crate::errors::DeltaTableError; -use crate::table_state::DeltaTableState; +use crate::protocol::{ColumnCountStat, ColumnValueStat, Stats}; use crate::SchemaDataType; use crate::SchemaTypeStruct; diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index 352a46d2b3..124ec0365b 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -385,7 +385,7 @@ pub mod az_cli { /// small wrapper around s3 cli pub mod s3_cli { use super::set_env_if_not_set; - use crate::builder::s3_storage_options; + use crate::table::builder::s3_storage_options; use std::process::{Command, ExitStatus, Stdio}; /// Create a new bucket diff --git a/rust/src/writer/json.rs b/rust/src/writer/json.rs index fc98cb0b90..f8d6d1a9e3 100644 --- a/rust/src/writer/json.rs +++ b/rust/src/writer/json.rs @@ -3,16 +3,6 @@ use std::collections::HashMap; use std::convert::TryFrom; use std::sync::Arc; -use super::stats::create_add; -use super::utils::{ - arrow_schema_without_partitions, next_data_path, record_batch_from_message, - record_batch_without_partitions, stringified_partition_value, PartitionPath, -}; -use super::{DeltaWriter, DeltaWriterError}; -use crate::builder::DeltaTableBuilder; -use crate::{action::Add, DeltaTable, DeltaTableError, DeltaTableMetaData, Schema}; -use crate::{storage::DeltaObjectStore, writer::utils::ShareableBuffer}; - use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use arrow::record_batch::*; use bytes::Bytes; @@ -26,6 +16,18 @@ use parquet::{ use serde_json::Value; use uuid::Uuid; +use super::stats::create_add; +use super::utils::{ + arrow_schema_without_partitions, next_data_path, record_batch_from_message, + record_batch_without_partitions, stringified_partition_value, +}; +use super::{utils::PartitionPath, DeltaWriter, DeltaWriterError}; +use crate::errors::DeltaTableError; +use crate::table::builder::DeltaTableBuilder; +use crate::table::DeltaTableMetaData; +use crate::{protocol::Add, DeltaTable, Schema}; +use crate::{storage::DeltaObjectStore, writer::utils::ShareableBuffer}; + type BadValue = (Value, ParquetError); /// Writes messages to a delta lake table. diff --git a/rust/src/writer/mod.rs b/rust/src/writer/mod.rs index 5685a71d48..8c5512127f 100644 --- a/rust/src/writer/mod.rs +++ b/rust/src/writer/mod.rs @@ -7,9 +7,9 @@ use object_store::Error as ObjectStoreError; use parquet::errors::ParquetError; use serde_json::Value; -use crate::action::{Action, Add, ColumnCountStat, DeltaOperation, SaveMode}; use crate::errors::DeltaTableError; use crate::operations::transaction::commit; +use crate::protocol::{Action, Add, ColumnCountStat, DeltaOperation, SaveMode}; use crate::DeltaTable; pub use json::JsonWriter; diff --git a/rust/src/writer/record_batch.rs b/rust/src/writer/record_batch.rs index 986b7b4026..e6495b6539 100644 --- a/rust/src/writer/record_batch.rs +++ b/rust/src/writer/record_batch.rs @@ -4,37 +4,15 @@ //! Each Parquet file is buffered in-memory and only written once `flush()` is called on //! the writer. Once written, add actions are returned by the writer. It's the users responsibility //! to create the transaction using those actions. -//! -//! # Examples -//! -//! Write to an existing Delta Lake table: -//! ```rust ignore -//! let table = DeltaTable::try_from_uri("../path/to/table") -//! let batch: RecordBatch = ... -//! let mut writer = RecordBatchWriter::for_table(table, /*storage_options=*/ HashMap::new()) -//! writer.write(batch)?; -//! let actions: Vec = writer.flush()?.iter() -//! .map(|add| Action::add(add.into())) -//! .collect(); -//! let mut transaction = table.create_transaction(Some(DeltaTransactionOptions::new(/*max_retry_attempts=*/3))); -//! transaction.add_actions(actions); -//! async { -//! transaction.commit(Some(DeltaOperation::Write { -//! SaveMode::Append, -//! partitionBy: Some(table.get_metadata().partition_columns), -//! predicate: None, -//! })) -//! } -//! ``` + use std::{collections::HashMap, sync::Arc}; use arrow::array::{Array, UInt32Array}; use arrow::compute::{partition, take}; -use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; -use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use arrow_array::ArrayRef; use arrow_row::{RowConverter, SortField}; +use arrow_schema::{ArrowError, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use bytes::Bytes; use object_store::{path::Path, ObjectStore}; use parquet::{arrow::ArrowWriter, errors::ParquetError}; @@ -47,9 +25,10 @@ use super::utils::{ stringified_partition_value, PartitionPath, ShareableBuffer, }; use super::{DeltaWriter, DeltaWriterError}; -use crate::builder::DeltaTableBuilder; use crate::errors::DeltaTableError; -use crate::{action::Add, storage::DeltaObjectStore, DeltaTable, DeltaTableMetaData, Schema}; +use crate::table::builder::DeltaTableBuilder; +use crate::table::DeltaTableMetaData; +use crate::{protocol::Add, storage::DeltaObjectStore, DeltaTable, Schema}; /// Writes messages to a delta lake table. pub struct RecordBatchWriter { diff --git a/rust/src/writer/stats.rs b/rust/src/writer/stats.rs index d96508d14f..0d369de46d 100644 --- a/rust/src/writer/stats.rs +++ b/rust/src/writer/stats.rs @@ -11,7 +11,7 @@ use parquet::{ }; use super::*; -use crate::action::{Add, ColumnValueStat, Stats}; +use crate::protocol::{Add, ColumnValueStat, Stats}; /// Creates an [`Add`] log action struct. pub fn create_add( @@ -479,9 +479,9 @@ mod tests { use super::utils::record_batch_from_message; use super::*; use crate::{ - action::{ColumnCountStat, ColumnValueStat}, - builder::DeltaTableBuilder, errors::DeltaTableError, + protocol::{ColumnCountStat, ColumnValueStat}, + table::builder::DeltaTableBuilder, DeltaTable, }; use lazy_static::lazy_static; diff --git a/rust/src/writer/test_utils.rs b/rust/src/writer/test_utils.rs index f519ebf720..d6c79bee94 100644 --- a/rust/src/writer/test_utils.rs +++ b/rust/src/writer/test_utils.rs @@ -1,3 +1,5 @@ +//! Utilities for writing unit tests + use std::collections::HashMap; use std::sync::Arc; @@ -5,10 +7,10 @@ use arrow::compute::take; use arrow_array::{Int32Array, Int64Array, RecordBatch, StringArray, StructArray, UInt32Array}; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; -use crate::delta::DeltaTableMetaData; use crate::operations::create::CreateBuilder; -use crate::schema::Schema; -use crate::{DeltaTable, DeltaTableBuilder, SchemaDataType, SchemaField, SchemaTypeStruct}; +use crate::schema::{Schema, SchemaTypeStruct}; +use crate::table::DeltaTableMetaData; +use crate::{DeltaTable, DeltaTableBuilder, SchemaDataType, SchemaField}; pub type TestResult = Result<(), Box>; diff --git a/rust/tests/checkpoint_writer.rs b/rust/tests/checkpoint_writer.rs index 999f07ff9c..10af3d9cfa 100644 --- a/rust/tests/checkpoint_writer.rs +++ b/rust/tests/checkpoint_writer.rs @@ -1,7 +1,6 @@ -#![deny(warnings)] - #[cfg(all(feature = "arrow", feature = "parquet"))] mod fs_common; +use deltalake::protocol::DeltaOperation; // NOTE: The below is a useful external command for inspecting the written checkpoint schema visually: // parquet-tools inspect tests/data/checkpoints/_delta_log/00000000000000000005.checkpoint.parquet @@ -91,7 +90,7 @@ mod delete_expired_delta_log_in_checkpoint { use ::object_store::path::Path as ObjectStorePath; use chrono::Utc; - use deltalake::delta_config::DeltaConfigKey; + use deltalake::table::config::DeltaConfigKey; use deltalake::*; use maplit::hashmap; @@ -212,8 +211,8 @@ mod checkpoints_with_tombstones { use super::*; use ::object_store::path::Path as ObjectStorePath; use chrono::Utc; - use deltalake::action::*; - use deltalake::delta_config::DeltaConfigKey; + use deltalake::protocol::*; + use deltalake::table::config::DeltaConfigKey; use deltalake::*; use maplit::hashmap; use parquet::file::reader::{FileReader, SerializedFileReader}; @@ -361,7 +360,6 @@ mod checkpoints_with_tombstones { .map(Action::remove) .chain(std::iter::once(Action::add(add.clone()))) .collect(); - let operation = DeltaOperation::Optimize { predicate: None, target_size: 1000000, diff --git a/rust/tests/command_filesystem_check.rs b/rust/tests/command_filesystem_check.rs index ced317d990..86fcd8f52e 100644 --- a/rust/tests/command_filesystem_check.rs +++ b/rust/tests/command_filesystem_check.rs @@ -12,7 +12,7 @@ mod common; #[tokio::test] #[serial] async fn test_filesystem_check_local() -> TestResult { - Ok(test_filesystem_check(StorageIntegration::Local).await?) + test_filesystem_check(StorageIntegration::Local).await } #[cfg(any(feature = "s3", feature = "s3-native-tls"))] @@ -21,21 +21,21 @@ async fn test_filesystem_check_local() -> TestResult { async fn test_filesystem_check_aws() -> TestResult { set_env_if_not_set("AWS_S3_ALLOW_UNSAFE_RENAME", "true"); set_env_if_not_set("AWS_S3_LOCKING_PROVIDER", "none"); - Ok(test_filesystem_check(StorageIntegration::Amazon).await?) + test_filesystem_check(StorageIntegration::Amazon).await } #[cfg(feature = "azure")] #[tokio::test] #[serial] async fn test_filesystem_check_azure() -> TestResult { - Ok(test_filesystem_check(StorageIntegration::Microsoft).await?) + test_filesystem_check(StorageIntegration::Microsoft).await } #[cfg(feature = "gcs")] #[tokio::test] #[serial] async fn test_filesystem_check_gcp() -> TestResult { - Ok(test_filesystem_check(StorageIntegration::Google).await?) + test_filesystem_check(StorageIntegration::Google).await } #[cfg(feature = "hdfs")] @@ -73,7 +73,7 @@ async fn test_filesystem_check(storage: StorageIntegration) -> TestResult { assert_eq!(vec![file.to_string()], metrics.files_removed); let remove = table.state.all_tombstones().get(file).unwrap(); - assert_eq!(remove.data_change, true); + assert!(remove.data_change); // An additonal run should return an empty list of orphaned actions let op = DeltaOps::from(table); @@ -114,7 +114,7 @@ async fn test_filesystem_check_partitioned() -> TestResult { assert_eq!(vec![file.to_string()], metrics.files_removed); let remove = table.state.all_tombstones().get(file).unwrap(); - assert_eq!(remove.data_change, true); + assert!(remove.data_change); Ok(()) } @@ -170,7 +170,7 @@ async fn test_filesystem_check_outdated() -> TestResult { if let Err(DeltaTableError::VersionAlreadyExists(version)) = res { assert!(version == 3); } else { - assert!(false); + panic!(); } Ok(()) diff --git a/rust/tests/command_optimize.rs b/rust/tests/command_optimize.rs index 2d31e330a4..153d7f86d5 100644 --- a/rust/tests/command_optimize.rs +++ b/rust/tests/command_optimize.rs @@ -6,11 +6,11 @@ use std::{collections::HashMap, error::Error, sync::Arc}; use arrow_array::{Int32Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use arrow_select::concat::concat_batches; -use deltalake::action::{Action, DeltaOperation, Remove}; use deltalake::errors::DeltaTableError; use deltalake::operations::optimize::{create_merge_plan, MetricDetails, Metrics, OptimizeType}; use deltalake::operations::transaction::commit; use deltalake::operations::DeltaOps; +use deltalake::protocol::{Action, DeltaOperation, Remove}; use deltalake::storage::ObjectStoreRef; use deltalake::writer::{DeltaWriter, RecordBatchWriter}; use deltalake::{DeltaTable, PartitionFilter, Path, SchemaDataType, SchemaField}; diff --git a/rust/tests/command_restore.rs b/rust/tests/command_restore.rs index cda9d5a257..8f8ac11ca1 100644 --- a/rust/tests/command_restore.rs +++ b/rust/tests/command_restore.rs @@ -4,7 +4,7 @@ use arrow::datatypes::Schema as ArrowSchema; use arrow_array::{Int32Array, RecordBatch}; use arrow_schema::{DataType, Field}; use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; -use deltalake::action::SaveMode; +use deltalake::protocol::SaveMode; use deltalake::{DeltaOps, DeltaTable, SchemaDataType, SchemaField}; use rand::Rng; use std::collections::HashMap; diff --git a/rust/tests/commit_info_format.rs b/rust/tests/commit_info_format.rs index c50a40d818..fdb1f89d92 100644 --- a/rust/tests/commit_info_format.rs +++ b/rust/tests/commit_info_format.rs @@ -1,8 +1,8 @@ #![allow(dead_code)] mod fs_common; -use deltalake::action::{Action, DeltaOperation, SaveMode}; use deltalake::operations::transaction::commit; +use deltalake::protocol::{Action, DeltaOperation, SaveMode}; use serde_json::json; use std::error::Error; use tempdir::TempDir; diff --git a/rust/tests/common/mod.rs b/rust/tests/common/mod.rs index b54fae20d0..2ba20d0635 100644 --- a/rust/tests/common/mod.rs +++ b/rust/tests/common/mod.rs @@ -1,9 +1,9 @@ #![allow(dead_code, unused_variables)] use bytes::Bytes; -use deltalake::action::{self, Add, DeltaOperation, Remove, SaveMode}; use deltalake::operations::create::CreateBuilder; use deltalake::operations::transaction::commit; +use deltalake::protocol::{self, Add, DeltaOperation, Remove, SaveMode}; use deltalake::storage::DeltaObjectStore; use deltalake::DeltaTableBuilder; use deltalake::{DeltaTable, Schema}; @@ -140,7 +140,7 @@ pub async fn add_file( partition_by: None, predicate: None, }; - let actions = vec![action::Action::add(add)]; + let actions = vec![protocol::Action::add(add)]; commit( table.object_store().as_ref(), &actions, @@ -173,7 +173,7 @@ pub async fn remove_file( ..Default::default() }; let operation = DeltaOperation::Delete { predicate: None }; - let actions = vec![action::Action::remove(remove)]; + let actions = vec![protocol::Action::remove(remove)]; commit( table.object_store().as_ref(), &actions, diff --git a/rust/tests/fs_common/mod.rs b/rust/tests/fs_common/mod.rs index 566436ac9c..3c5ab39e2c 100644 --- a/rust/tests/fs_common/mod.rs +++ b/rust/tests/fs_common/mod.rs @@ -1,7 +1,7 @@ use chrono::Utc; -use deltalake::action::{Action, Add, DeltaOperation, Remove, SaveMode}; use deltalake::operations::create::CreateBuilder; use deltalake::operations::transaction::commit; +use deltalake::protocol::{Action, Add, DeltaOperation, Remove, SaveMode}; use deltalake::storage::{DeltaObjectStore, GetResult, ObjectStoreResult}; use deltalake::{DeltaTable, Schema, SchemaDataType, SchemaField}; use object_store::path::Path as StorePath; diff --git a/rust/tests/integration_concurrent_writes.rs b/rust/tests/integration_concurrent_writes.rs index 314e9ce9a6..f34feac6e0 100644 --- a/rust/tests/integration_concurrent_writes.rs +++ b/rust/tests/integration_concurrent_writes.rs @@ -1,8 +1,8 @@ #![cfg(feature = "integration_test")] -use deltalake::action::{Action, Add, DeltaOperation, SaveMode}; use deltalake::operations::transaction::commit; use deltalake::operations::DeltaOps; +use deltalake::protocol::{Action, Add, DeltaOperation, SaveMode}; use deltalake::test_utils::{IntegrationContext, StorageIntegration, TestResult, TestTables}; use deltalake::{DeltaTable, DeltaTableBuilder, Schema, SchemaDataType, SchemaField}; use std::collections::HashMap; @@ -103,12 +103,12 @@ where assert_eq!(map.len() as i64, WORKERS * COMMITS); // check that we have unique and ascending versions committed - let mut versions = Vec::from_iter(map.keys().map(|x| x.clone())); + let mut versions = Vec::from_iter(map.keys().copied()); versions.sort(); assert_eq!(versions, Vec::from_iter(1i64..=WORKERS * COMMITS)); // check that each file for each worker is committed as expected - let mut files = Vec::from_iter(map.values().map(|x| x.clone())); + let mut files = Vec::from_iter(map.values().cloned()); files.sort(); let mut expected = Vec::new(); for w in 0..WORKERS { diff --git a/rust/tests/integration_datafusion.rs b/rust/tests/integration_datafusion.rs index 68c9149ac7..5aafe52e87 100644 --- a/rust/tests/integration_datafusion.rs +++ b/rust/tests/integration_datafusion.rs @@ -31,9 +31,9 @@ use datafusion_proto::bytes::{ }; use url::Url; -use deltalake::action::SaveMode; use deltalake::delta_datafusion::{DeltaPhysicalCodec, DeltaScan}; use deltalake::operations::create::CreateBuilder; +use deltalake::protocol::SaveMode; use deltalake::storage::DeltaObjectStore; use deltalake::writer::{DeltaWriter, RecordBatchWriter}; use deltalake::{ @@ -51,7 +51,7 @@ mod local { #[tokio::test] #[serial] async fn test_datafusion_local() -> TestResult { - Ok(test_datafusion(StorageIntegration::Local).await?) + test_datafusion(StorageIntegration::Local).await } fn get_scanned_files(node: &dyn ExecutionPlan) -> HashSet