Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add deprecation notices for commit logic on DeltaTable #1323

Merged
merged 2 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 40 additions & 24 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::operations::transaction::commit;
use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::partitions::PartitionFilter;
use deltalake::{DeltaDataTypeLong, DeltaDataTypeTimestamp, DeltaTableMetaData, Invariant, Schema};
use deltalake::{
DeltaConfigKey, DeltaDataTypeLong, DeltaDataTypeTimestamp, DeltaOps, Invariant, Schema,
};
use pyo3::create_exception;
use pyo3::exceptions::PyException;
use pyo3::exceptions::PyValueError;
Expand All @@ -29,6 +31,8 @@ use pyo3::types::PyType;
use std::collections::HashMap;
use std::collections::HashSet;
use std::convert::TryFrom;
use std::future::IntoFuture;
use std::str::FromStr;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
Expand Down Expand Up @@ -740,33 +744,45 @@ fn write_new_deltalake(
configuration: Option<HashMap<String, Option<String>>>,
storage_options: Option<HashMap<String, String>>,
) -> PyResult<()> {
let mut table = DeltaTableBuilder::from_uri(table_uri)
let table = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(storage_options.unwrap_or_default())
.build()
.map_err(PyDeltaTableError::from_raw)?;

let metadata = DeltaTableMetaData::new(
name,
description,
None, // Format
(&schema.0)
.try_into()
.map_err(PyDeltaTableError::from_arrow)?,
partition_by,
configuration.unwrap_or_default(),
);

let fut = table.create(
metadata,
action::Protocol {
min_reader_version: 1,
min_writer_version: 1, // TODO: Make sure we comply with protocol
},
None, // TODO
Some(add_actions.iter().map(|add| add.into()).collect()),
);

rt()?.block_on(fut).map_err(PyDeltaTableError::from_raw)?;
let schema: Schema = (&schema.0)
.try_into()
.map_err(PyDeltaTableError::from_arrow)?;

let configuration = configuration
.unwrap_or_default()
.into_iter()
.filter_map(|(key, value)| {
if let Ok(key) = DeltaConfigKey::from_str(&key) {
Some((key, value))
} else {
None
}
})
.collect();
Comment on lines +756 to +766
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure how we want to handle table configuration vs. metadata. With this change only known configuration can be passed to delta table, but as of right now there is no way to pass in metadata. First question would be how we want to deal with invalid config. While the code just silently ignores it, I am starting to lean towards raising an error instead. For metadata we would need to introduce a new parameter.


let mut builder = DeltaOps(table)
.create()
.with_columns(schema.get_fields().clone())
.with_partition_columns(partition_by)
.with_configuration(configuration)
.with_actions(add_actions.iter().map(|add| Action::add(add.into())));

if let Some(name) = &name {
builder = builder.with_table_name(name);
};

if let Some(description) = &description {
builder = builder.with_comment(description);
};

rt()?
.block_on(builder.into_future())
.map_err(PyDeltaTableError::from_raw)?;

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions python/tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def test_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table):
sample_data,
name="test_name",
description="test_desc",
configuration={"configTest": "foobar"},
configuration={"delta.appendOnly": "false"},
)

delta_table = DeltaTable(tmp_path)
Expand All @@ -153,7 +153,7 @@ def test_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table):

assert metadata.name == "test_name"
assert metadata.description == "test_desc"
assert metadata.configuration == {"configTest": "foobar"}
assert metadata.configuration == {"delta.appendOnly": "false"}


@pytest.mark.parametrize(
Expand Down
33 changes: 8 additions & 25 deletions rust/examples/recordbatch-writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
*/

use chrono::prelude::*;
use deltalake::action::*;
use deltalake::arrow::array::*;
use deltalake::arrow::record_batch::RecordBatch;
use deltalake::writer::{DeltaWriter, RecordBatchWriter};
Expand Down Expand Up @@ -75,8 +74,8 @@ struct WeatherRecord {
}

impl WeatherRecord {
fn schema() -> Schema {
Schema::new(vec![
fn columns() -> Vec<SchemaField> {
vec![
SchemaField::new(
"timestamp".to_string(),
SchemaDataType::primitive("timestamp".to_string()),
Expand All @@ -101,7 +100,7 @@ impl WeatherRecord {
true,
HashMap::new(),
),
])
]
}
}

Expand Down Expand Up @@ -189,27 +188,11 @@ fn convert_to_batch(table: &DeltaTable, records: &Vec<WeatherRecord>) -> RecordB
* Table in an existing directory that doesn't currently contain a Delta table
*/
async fn create_initialized_table(table_path: &Path) -> DeltaTable {
let mut table = DeltaTableBuilder::from_uri(table_path).build().unwrap();
let table_schema = WeatherRecord::schema();
let mut commit_info = serde_json::Map::<String, serde_json::Value>::new();
commit_info.insert(
"operation".to_string(),
serde_json::Value::String("CREATE TABLE".to_string()),
);
commit_info.insert(
"userName".to_string(),
serde_json::Value::String("test user".to_string()),
);

let protocol = Protocol {
min_reader_version: 1,
min_writer_version: 1,
};

let metadata = DeltaTableMetaData::new(None, None, None, table_schema, vec![], HashMap::new());

table
.create(metadata, protocol, Some(commit_info), None)
let table = DeltaOps::try_from_uri(table_path)
.await
.unwrap()
.create()
.with_columns(WeatherRecord::columns())
.await
.unwrap();

Expand Down
31 changes: 30 additions & 1 deletion rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,10 @@ impl DeltaTable {
}

/// Vacuum the delta table. See [`VacuumBuilder`] for more information.
#[deprecated(
since = "0.10.0",
note = "use DelaOps from operations module to create a Vacuum operation."
)]
pub async fn vacuum(
&mut self,
retention_hours: Option<u64>,
Expand All @@ -1133,6 +1137,11 @@ impl DeltaTable {
/// Creates a new DeltaTransaction for the DeltaTable.
/// The transaction holds a mutable reference to the DeltaTable, preventing other references
/// until the transaction is dropped.
#[deprecated(
since = "0.10.0",
note = "use 'commit' function from operations module to commit to Delta table."
)]
#[allow(deprecated)]
pub fn create_transaction(
&mut self,
options: Option<DeltaTransactionOptions>,
Expand All @@ -1145,6 +1154,11 @@ impl DeltaTable {
/// This is low-level transaction API. If user does not want to maintain the commit loop then
/// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction`
/// with retry logic.
#[deprecated(
since = "0.10.0",
note = "use 'commit' function from operations module to commite to Delta table."
)]
#[allow(deprecated)]
pub async fn try_commit_transaction(
&mut self,
commit: &PreparedCommit,
Expand All @@ -1168,6 +1182,11 @@ impl DeltaTable {
}

/// Create a DeltaTable with version 0 given the provided MetaData, Protocol, and CommitInfo
#[deprecated(
since = "0.10.0",
note = "use DelaOps from operations module to create a Create operation."
)]
#[allow(deprecated)]
pub async fn create(
&mut self,
metadata: DeltaTableMetaData,
Expand Down Expand Up @@ -1322,12 +1341,17 @@ impl Default for DeltaTransactionOptions {
/// Please not that in case of non-retryable error the temporary commit file such as
/// `_delta_log/_commit_<uuid>.json` will orphaned in storage.
#[derive(Debug)]
#[deprecated(
since = "0.10.0",
note = "use 'commit' function from operations module to commit to Delta table."
)]
pub struct DeltaTransaction<'a> {
delta_table: &'a mut DeltaTable,
actions: Vec<Action>,
options: DeltaTransactionOptions,
}

#[allow(deprecated)]
impl<'a> DeltaTransaction<'a> {
/// Creates a new delta transaction.
/// Holds a mutable reference to the delta table to prevent outside mutation while a transaction commit is in progress.
Expand Down Expand Up @@ -1377,7 +1401,6 @@ impl<'a> DeltaTransaction<'a> {
// } else {
// IsolationLevel::Serializable
// };

let prepared_commit = self.prepare_commit(operation, app_metadata).await?;

// try to commit in a loop in case other writers write the next version first
Expand Down Expand Up @@ -1430,6 +1453,7 @@ impl<'a> DeltaTransaction<'a> {
Ok(PreparedCommit { uri: path })
}

#[allow(deprecated)]
async fn try_commit_loop(
&mut self,
commit: &PreparedCommit,
Expand Down Expand Up @@ -1473,6 +1497,10 @@ impl<'a> DeltaTransaction<'a> {
/// Holds the uri to prepared commit temporary file created with `DeltaTransaction.prepare_commit`.
/// Once created, the actual commit could be executed with `DeltaTransaction.try_commit`.
#[derive(Debug)]
#[deprecated(
since = "0.10.0",
note = "use 'commit' function from operations module to commit to Delta table."
)]
pub struct PreparedCommit {
uri: Path,
}
Expand Down Expand Up @@ -1624,6 +1652,7 @@ mod tests {
serde_json::Value::String("test user".to_string()),
);
// Action
#[allow(deprecated)]
dt.create(delta_md.clone(), protocol.clone(), Some(commit_info), None)
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions rust/src/delta_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::DeltaTableError;
/// Typed property keys that can be defined on a delta table
/// <https://docs.delta.io/latest/table-properties.html#delta-table-properties-reference>
/// <https://learn.microsoft.com/en-us/azure/databricks/delta/table-properties>
#[derive(PartialEq, Eq, Hash)]
pub enum DeltaConfigKey {
/// true for this Delta table to be append-only. If append-only,
/// existing records cannot be deleted, and existing values cannot be updated.
Expand Down
1 change: 1 addition & 0 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub mod writer;
pub use self::builder::*;
pub use self::data_catalog::{get_data_catalog, DataCatalog, DataCatalogError};
pub use self::delta::*;
pub use self::delta_config::*;
pub use self::partitions::*;
pub use self::schema::*;
pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore};
Expand Down
9 changes: 6 additions & 3 deletions rust/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,11 @@ impl CreateBuilder {
}

/// Specify columns to append to schema
pub fn with_columns(mut self, columns: impl IntoIterator<Item = SchemaField>) -> Self {
self.columns.extend(columns);
pub fn with_columns(
mut self,
columns: impl IntoIterator<Item = impl Into<SchemaField>>,
) -> Self {
self.columns.extend(columns.into_iter().map(|c| c.into()));
self
}

Expand Down Expand Up @@ -269,7 +272,7 @@ impl CreateBuilder {
actions.extend(
self.actions
.into_iter()
.filter(|a| matches!(a, Action::protocol(_))),
.filter(|a| !matches!(a, Action::protocol(_))),
);

Ok((table, actions, operation))
Expand Down
1 change: 1 addition & 0 deletions rust/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub trait DeltaWriter<T> {
table: &mut DeltaTable,
) -> Result<DeltaDataTypeVersion, DeltaTableError> {
let mut adds = self.flush().await?;
#[allow(deprecated)]
let mut tx = table.create_transaction(None);
tx.add_actions(adds.drain(..).map(Action::add).collect());
let version = tx.commit(None, None).await?;
Expand Down
1 change: 1 addition & 0 deletions rust/src/writer/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![allow(deprecated)]
//! Utilities for writing unit tests
use super::*;
use crate::{
Expand Down
Loading