From 95a3aade9e143a86ebf11c05c21f2085d0624931 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Mon, 1 May 2023 12:25:10 +0200 Subject: [PATCH 1/2] chore: add deprecation notices for commit logic on DeltaTable --- python/src/lib.rs | 64 ++++++++++++++++++----------- rust/examples/recordbatch-writer.rs | 33 ++++----------- rust/src/delta.rs | 31 +++++++++++++- rust/src/delta_config.rs | 1 + rust/src/lib.rs | 1 + rust/src/operations/create.rs | 7 +++- rust/src/writer/mod.rs | 1 + rust/src/writer/test_utils.rs | 1 + rust/tests/command_optimize.rs | 54 +++++++----------------- rust/tests/commit_info_format.rs | 2 +- rust/tests/common/mod.rs | 3 +- rust/tests/fs_common/mod.rs | 1 + 12 files changed, 104 insertions(+), 95 deletions(-) diff --git a/python/src/lib.rs b/python/src/lib.rs index 19f594391d..98d8c0a92a 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -19,7 +19,9 @@ use deltalake::delta_datafusion::DeltaDataChecker; use deltalake::operations::transaction::commit; use deltalake::operations::vacuum::VacuumBuilder; use deltalake::partitions::PartitionFilter; -use deltalake::{DeltaDataTypeLong, DeltaDataTypeTimestamp, DeltaTableMetaData, Invariant, Schema}; +use deltalake::{ + DeltaConfigKey, DeltaDataTypeLong, DeltaDataTypeTimestamp, DeltaOps, Invariant, Schema, +}; use pyo3::create_exception; use pyo3::exceptions::PyException; use pyo3::exceptions::PyValueError; @@ -29,6 +31,8 @@ use pyo3::types::PyType; use std::collections::HashMap; use std::collections::HashSet; use std::convert::TryFrom; +use std::future::IntoFuture; +use std::str::FromStr; use std::sync::Arc; use std::time::SystemTime; use std::time::UNIX_EPOCH; @@ -740,33 +744,45 @@ fn write_new_deltalake( configuration: Option>>, storage_options: Option>, ) -> PyResult<()> { - let mut table = DeltaTableBuilder::from_uri(table_uri) + let table = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(storage_options.unwrap_or_default()) .build() .map_err(PyDeltaTableError::from_raw)?; - let metadata = DeltaTableMetaData::new( - name, - description, - None, // Format - (&schema.0) - .try_into() - .map_err(PyDeltaTableError::from_arrow)?, - partition_by, - configuration.unwrap_or_default(), - ); - - let fut = table.create( - metadata, - action::Protocol { - min_reader_version: 1, - min_writer_version: 1, // TODO: Make sure we comply with protocol - }, - None, // TODO - Some(add_actions.iter().map(|add| add.into()).collect()), - ); - - rt()?.block_on(fut).map_err(PyDeltaTableError::from_raw)?; + let schema: Schema = (&schema.0) + .try_into() + .map_err(PyDeltaTableError::from_arrow)?; + + let configuration = configuration + .unwrap_or_default() + .into_iter() + .filter_map(|(key, value)| { + if let Ok(key) = DeltaConfigKey::from_str(&key) { + Some((key, value)) + } else { + None + } + }) + .collect(); + + let mut builder = DeltaOps(table) + .create() + .with_columns(schema.get_fields().clone()) + .with_partition_columns(partition_by) + .with_configuration(configuration) + .with_actions(add_actions.iter().map(|add| Action::add(add.into()))); + + if let Some(name) = &name { + builder = builder.with_table_name(name); + }; + + if let Some(description) = &description { + builder = builder.with_comment(description); + }; + + rt()? + .block_on(builder.into_future()) + .map_err(PyDeltaTableError::from_raw)?; Ok(()) } diff --git a/rust/examples/recordbatch-writer.rs b/rust/examples/recordbatch-writer.rs index 4943daa8e4..d490a2dad7 100644 --- a/rust/examples/recordbatch-writer.rs +++ b/rust/examples/recordbatch-writer.rs @@ -8,7 +8,6 @@ */ use chrono::prelude::*; -use deltalake::action::*; use deltalake::arrow::array::*; use deltalake::arrow::record_batch::RecordBatch; use deltalake::writer::{DeltaWriter, RecordBatchWriter}; @@ -75,8 +74,8 @@ struct WeatherRecord { } impl WeatherRecord { - fn schema() -> Schema { - Schema::new(vec![ + fn columns() -> Vec { + vec![ SchemaField::new( "timestamp".to_string(), SchemaDataType::primitive("timestamp".to_string()), @@ -101,7 +100,7 @@ impl WeatherRecord { true, HashMap::new(), ), - ]) + ] } } @@ -189,27 +188,11 @@ fn convert_to_batch(table: &DeltaTable, records: &Vec) -> RecordB * Table in an existing directory that doesn't currently contain a Delta table */ async fn create_initialized_table(table_path: &Path) -> DeltaTable { - let mut table = DeltaTableBuilder::from_uri(table_path).build().unwrap(); - let table_schema = WeatherRecord::schema(); - let mut commit_info = serde_json::Map::::new(); - commit_info.insert( - "operation".to_string(), - serde_json::Value::String("CREATE TABLE".to_string()), - ); - commit_info.insert( - "userName".to_string(), - serde_json::Value::String("test user".to_string()), - ); - - let protocol = Protocol { - min_reader_version: 1, - min_writer_version: 1, - }; - - let metadata = DeltaTableMetaData::new(None, None, None, table_schema, vec![], HashMap::new()); - - table - .create(metadata, protocol, Some(commit_info), None) + let table = DeltaOps::try_from_uri(table_path) + .await + .unwrap() + .create() + .with_columns(WeatherRecord::columns()) .await .unwrap(); diff --git a/rust/src/delta.rs b/rust/src/delta.rs index b7d6cb66aa..a8ea919fe1 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -1112,6 +1112,10 @@ impl DeltaTable { } /// Vacuum the delta table. See [`VacuumBuilder`] for more information. + #[deprecated( + since = "0.10.0", + note = "use DelaOps from operations module to create a Vacuum operation." + )] pub async fn vacuum( &mut self, retention_hours: Option, @@ -1133,6 +1137,11 @@ impl DeltaTable { /// Creates a new DeltaTransaction for the DeltaTable. /// The transaction holds a mutable reference to the DeltaTable, preventing other references /// until the transaction is dropped. + #[deprecated( + since = "0.10.0", + note = "use 'commit' function from operations module to commit to Delta table." + )] + #[allow(deprecated)] pub fn create_transaction( &mut self, options: Option, @@ -1145,6 +1154,11 @@ impl DeltaTable { /// This is low-level transaction API. If user does not want to maintain the commit loop then /// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction` /// with retry logic. + #[deprecated( + since = "0.10.0", + note = "use 'commit' function from operations module to commite to Delta table." + )] + #[allow(deprecated)] pub async fn try_commit_transaction( &mut self, commit: &PreparedCommit, @@ -1168,6 +1182,11 @@ impl DeltaTable { } /// Create a DeltaTable with version 0 given the provided MetaData, Protocol, and CommitInfo + #[deprecated( + since = "0.10.0", + note = "use DelaOps from operations module to create a Create operation." + )] + #[allow(deprecated)] pub async fn create( &mut self, metadata: DeltaTableMetaData, @@ -1322,12 +1341,17 @@ impl Default for DeltaTransactionOptions { /// Please not that in case of non-retryable error the temporary commit file such as /// `_delta_log/_commit_.json` will orphaned in storage. #[derive(Debug)] +#[deprecated( + since = "0.10.0", + note = "use 'commit' function from operations module to commit to Delta table." +)] pub struct DeltaTransaction<'a> { delta_table: &'a mut DeltaTable, actions: Vec, options: DeltaTransactionOptions, } +#[allow(deprecated)] impl<'a> DeltaTransaction<'a> { /// Creates a new delta transaction. /// Holds a mutable reference to the delta table to prevent outside mutation while a transaction commit is in progress. @@ -1377,7 +1401,6 @@ impl<'a> DeltaTransaction<'a> { // } else { // IsolationLevel::Serializable // }; - let prepared_commit = self.prepare_commit(operation, app_metadata).await?; // try to commit in a loop in case other writers write the next version first @@ -1430,6 +1453,7 @@ impl<'a> DeltaTransaction<'a> { Ok(PreparedCommit { uri: path }) } + #[allow(deprecated)] async fn try_commit_loop( &mut self, commit: &PreparedCommit, @@ -1473,6 +1497,10 @@ impl<'a> DeltaTransaction<'a> { /// Holds the uri to prepared commit temporary file created with `DeltaTransaction.prepare_commit`. /// Once created, the actual commit could be executed with `DeltaTransaction.try_commit`. #[derive(Debug)] +#[deprecated( + since = "0.10.0", + note = "use 'commit' function from operations module to commit to Delta table." +)] pub struct PreparedCommit { uri: Path, } @@ -1624,6 +1652,7 @@ mod tests { serde_json::Value::String("test user".to_string()), ); // Action + #[allow(deprecated)] dt.create(delta_md.clone(), protocol.clone(), Some(commit_info), None) .await .unwrap(); diff --git a/rust/src/delta_config.rs b/rust/src/delta_config.rs index df0a13b455..722f04d5ae 100644 --- a/rust/src/delta_config.rs +++ b/rust/src/delta_config.rs @@ -10,6 +10,7 @@ use crate::DeltaTableError; /// Typed property keys that can be defined on a delta table /// /// +#[derive(PartialEq, Eq, Hash)] pub enum DeltaConfigKey { /// true for this Delta table to be append-only. If append-only, /// existing records cannot be deleted, and existing values cannot be updated. diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 43a3277ae6..ae6ea740df 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -109,6 +109,7 @@ pub mod writer; pub use self::builder::*; pub use self::data_catalog::{get_data_catalog, DataCatalog, DataCatalogError}; pub use self::delta::*; +pub use self::delta_config::*; pub use self::partitions::*; pub use self::schema::*; pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore}; diff --git a/rust/src/operations/create.rs b/rust/src/operations/create.rs index 9384aa4767..5598759a19 100644 --- a/rust/src/operations/create.rs +++ b/rust/src/operations/create.rs @@ -125,8 +125,11 @@ impl CreateBuilder { } /// Specify columns to append to schema - pub fn with_columns(mut self, columns: impl IntoIterator) -> Self { - self.columns.extend(columns); + pub fn with_columns( + mut self, + columns: impl IntoIterator>, + ) -> Self { + self.columns.extend(columns.into_iter().map(|c| c.into())); self } diff --git a/rust/src/writer/mod.rs b/rust/src/writer/mod.rs index b590a6444e..3c3c40d0aa 100644 --- a/rust/src/writer/mod.rs +++ b/rust/src/writer/mod.rs @@ -128,6 +128,7 @@ pub trait DeltaWriter { table: &mut DeltaTable, ) -> Result { let mut adds = self.flush().await?; + #[allow(deprecated)] let mut tx = table.create_transaction(None); tx.add_actions(adds.drain(..).map(Action::add).collect()); let version = tx.commit(None, None).await?; diff --git a/rust/src/writer/test_utils.rs b/rust/src/writer/test_utils.rs index 62684c3f05..2cb66ac893 100644 --- a/rust/src/writer/test_utils.rs +++ b/rust/src/writer/test_utils.rs @@ -1,3 +1,4 @@ +#![allow(deprecated)] //! Utilities for writing unit tests use super::*; use crate::{ diff --git a/rust/tests/command_optimize.rs b/rust/tests/command_optimize.rs index 81a292867a..254757293d 100644 --- a/rust/tests/command_optimize.rs +++ b/rust/tests/command_optimize.rs @@ -6,18 +6,14 @@ use arrow::{ datatypes::{DataType, Field}, record_batch::RecordBatch, }; -use deltalake::action::{Action, Protocol, Remove}; -use deltalake::builder::DeltaTableBuilder; +use deltalake::action::{Action, Remove}; use deltalake::operations::optimize::{create_merge_plan, MetricDetails, Metrics}; use deltalake::operations::DeltaOps; use deltalake::writer::{DeltaWriter, RecordBatchWriter}; -use deltalake::{ - DeltaTable, DeltaTableError, DeltaTableMetaData, PartitionFilter, Schema, SchemaDataType, - SchemaField, -}; +use deltalake::{DeltaTable, DeltaTableError, PartitionFilter, SchemaDataType, SchemaField}; use parquet::file::properties::WriterProperties; use rand::prelude::*; -use serde_json::{json, Map, Value}; +use serde_json::json; use std::time::SystemTime; use std::time::UNIX_EPOCH; use std::{collections::HashMap, error::Error, sync::Arc}; @@ -29,7 +25,7 @@ struct Context { } async fn setup_test(partitioned: bool) -> Result> { - let schema = Schema::new(vec![ + let columns = vec![ SchemaField::new( "x".to_owned(), SchemaDataType::primitive("integer".to_owned()), @@ -48,45 +44,22 @@ async fn setup_test(partitioned: bool) -> Result> { false, HashMap::new(), ), - ]); + ]; - let p = if partitioned { + let partition_columns = if partitioned { vec!["date".to_owned()] } else { vec![] }; - let table_meta = DeltaTableMetaData::new( - Some("opt_table".to_owned()), - Some("Table for optimize tests".to_owned()), - None, - schema.clone(), - p, - HashMap::new(), - ); - let tmp_dir = tempdir::TempDir::new("opt_table").unwrap(); - let p = tmp_dir.path().to_str().to_owned().unwrap(); - let mut dt = DeltaTableBuilder::from_uri(p).build()?; - - let mut commit_info = Map::::new(); - - let protocol = Protocol { - min_reader_version: 1, - min_writer_version: 2, - }; - - commit_info.insert( - "operation".to_string(), - serde_json::Value::String("CREATE TABLE".to_string()), - ); - dt.create( - table_meta.clone(), - protocol.clone(), - Some(commit_info), - None, - ) - .await?; + let table_uri = tmp_dir.path().to_str().to_owned().unwrap(); + let dt = DeltaOps::try_from_uri(table_uri) + .await? + .create() + .with_columns(columns) + .with_partition_columns(partition_columns) + .await?; Ok(Context { tmp_dir, table: dt }) } @@ -314,6 +287,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { tags: Some(HashMap::new()), }; + #[allow(deprecated)] let mut transaction = other_dt.create_transaction(None); transaction.add_action(Action::remove(remove)); transaction.commit(None, None).await?; diff --git a/rust/tests/commit_info_format.rs b/rust/tests/commit_info_format.rs index 60a8bf5ee4..200178f849 100644 --- a/rust/tests/commit_info_format.rs +++ b/rust/tests/commit_info_format.rs @@ -1,4 +1,4 @@ -#[allow(dead_code)] +#![allow(dead_code, deprecated)] mod fs_common; use deltalake::action::{Action, DeltaOperation, SaveMode}; diff --git a/rust/tests/common/mod.rs b/rust/tests/common/mod.rs index a2c5dacc05..0bcb7f02a0 100644 --- a/rust/tests/common/mod.rs +++ b/rust/tests/common/mod.rs @@ -1,5 +1,4 @@ -#![allow(dead_code)] -#![allow(unused_variables)] +#![allow(dead_code, unused_variables, deprecated)] use bytes::Bytes; use deltalake::action::{self, Add, Remove}; diff --git a/rust/tests/fs_common/mod.rs b/rust/tests/fs_common/mod.rs index 7d1383afd9..4243afbe98 100644 --- a/rust/tests/fs_common/mod.rs +++ b/rust/tests/fs_common/mod.rs @@ -1,3 +1,4 @@ +#![allow(deprecated)] use chrono::Utc; use deltalake::action::{Action, Add, Protocol, Remove}; use deltalake::{ From f406873841ca06eea2c87e804a344d26ba331309 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Mon, 1 May 2023 19:52:55 +0200 Subject: [PATCH 2/2] fix: invert filter additional actions in create command --- python/tests/test_writer.py | 4 ++-- rust/src/operations/create.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 5f8838b55b..6e6c2efb45 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -144,7 +144,7 @@ def test_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table): sample_data, name="test_name", description="test_desc", - configuration={"configTest": "foobar"}, + configuration={"delta.appendOnly": "false"}, ) delta_table = DeltaTable(tmp_path) @@ -153,7 +153,7 @@ def test_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table): assert metadata.name == "test_name" assert metadata.description == "test_desc" - assert metadata.configuration == {"configTest": "foobar"} + assert metadata.configuration == {"delta.appendOnly": "false"} @pytest.mark.parametrize( diff --git a/rust/src/operations/create.rs b/rust/src/operations/create.rs index 5598759a19..e17eec9f7a 100644 --- a/rust/src/operations/create.rs +++ b/rust/src/operations/create.rs @@ -272,7 +272,7 @@ impl CreateBuilder { actions.extend( self.actions .into_iter() - .filter(|a| matches!(a, Action::protocol(_))), + .filter(|a| !matches!(a, Action::protocol(_))), ); Ok((table, actions, operation))