Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: re-organize top level modules #1434

Merged
merged 26 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4b3ccf5
chore: remove deprecated create function on DeltaTable
roeap Jun 4, 2023
9a6d87d
chore: remove deprecated vacuum function on DeltaTable
roeap Jun 4, 2023
57d0835
chore: remove deprecated commit 1/n
roeap Jun 4, 2023
05c51a9
fix: actually load table in concurrency tests.
roeap Jun 4, 2023
e56f003
refactor: move table into separate module
roeap Jun 4, 2023
d1e0b32
refactor: rename arrow module
roeap Jun 4, 2023
cf8c104
fix: test import
roeap Jun 4, 2023
17fb81b
fix: clippy
roeap Jun 5, 2023
1fb080c
fix: docs link
roeap Jun 6, 2023
1d5a9d2
refactor: move all schema-related into schema module
roeap Jun 6, 2023
891754a
refactor: move table config into table module
roeap Jun 6, 2023
08a6a67
fix: inclerease commit attempts
roeap Jun 6, 2023
00af840
refactor: be more deliberate about exports
roeap Jun 6, 2023
56bbd21
refactor: move time_utils into action module
roeap Jun 6, 2023
be7f961
fix: docs links
roeap Jun 6, 2023
c785358
reafctor: rename actions to protocol
roeap Jun 6, 2023
e5c323a
fix: parquet2 imports
roeap Jun 6, 2023
4bacb42
Merge branch 'main' into module-cleanup
roeap Jun 7, 2023
53f8df4
Merge branch 'main' into module-cleanup
roeap Jun 12, 2023
2747acb
Merge branch 'main' into module-cleanup
roeap Jun 14, 2023
3b810cb
Merge branch 'main' into module-cleanup
roeap Jun 20, 2023
6d11b68
Merge branch 'main' into module-cleanup
roeap Jul 17, 2023
c9bd37c
fix: imports after merge
roeap Jul 17, 2023
22b500d
Merge branch 'main' into module-cleanup
roeap Sep 23, 2023
2a425d7
chore: various fixes and cleanup after merge
roeap Sep 23, 2023
01cf2da
Merge branch 'main' into module-cleanup
roeap Sep 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use deltalake::action::{
use deltalake::arrow::compute::concat_batches;
use deltalake::arrow::record_batch::RecordBatch;
use deltalake::arrow::{self, datatypes::Schema as ArrowSchema};
use deltalake::builder::DeltaTableBuilder;
use deltalake::checkpoints::create_checkpoint;
use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
Expand All @@ -28,6 +27,7 @@ use deltalake::operations::optimize::OptimizeBuilder;
use deltalake::operations::transaction::commit;
use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::partitions::PartitionFilter;
use deltalake::DeltaTableBuilder;
use deltalake::{DeltaOps, Invariant, Schema};
use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError};
use pyo3::prelude::*;
Expand Down
4 changes: 2 additions & 2 deletions rust/benches/read_checkpoint.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use criterion::{criterion_group, criterion_main, Criterion};
use deltalake::delta::DeltaTableConfig;
use deltalake::table_state::DeltaTableState;
use deltalake::state::DeltaTableState;
use deltalake::DeltaTableConfig;
use std::fs::File;
use std::io::Read;

Expand Down
4 changes: 2 additions & 2 deletions rust/src/action/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use std::iter::Iterator;
use std::ops::Add;

use super::{Action, Add as AddAction, MetaData, Protocol, ProtocolError, Txn};
use crate::delta_arrow::delta_log_schema_for_table;
use crate::arrow_convert::delta_log_schema_for_table;
use crate::schema::*;
use crate::storage::DeltaObjectStore;
use crate::table_state::DeltaTableState;
use crate::table::state::DeltaTableState;
use crate::{open_table_with_version, time_utils, CheckPoint, DeltaTable};

type SchemaPath = Vec<String>;
Expand Down
4 changes: 2 additions & 2 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ use std::borrow::Borrow;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};

use crate::delta_config::IsolationLevel;
use crate::config::IsolationLevel;
use crate::errors::DeltaResult;
use crate::storage::ObjectStoreRef;
use crate::{delta::CheckPoint, schema::*, DeltaTableMetaData};
use crate::{schema::*, table::CheckPoint, DeltaTableMetaData};

/// Error returned when an invalid Delta log action is encountered.
#[allow(missing_docs)]
Expand Down
2 changes: 1 addition & 1 deletion rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use crate::action::{self, Add};
use crate::builder::ensure_table_uri;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::storage::ObjectStoreRef;
use crate::table_state::DeltaTableState;
use crate::table::state::DeltaTableState;
use crate::{open_table, open_table_with_storage_options, DeltaTable, Invariant, SchemaDataType};

impl From<DeltaTableError> for DataFusionError {
Expand Down
18 changes: 4 additions & 14 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,34 +83,24 @@ compile_error!(
);

pub mod action;
pub mod builder;
pub mod data_catalog;
pub mod delta;
pub mod delta_config;
pub mod errors;
pub mod operations;
pub mod partitions;
pub mod schema;
pub mod storage;
pub mod table_state;
pub mod table;
pub mod time_utils;

#[cfg(all(feature = "arrow"))]
pub mod table_state_arrow;

#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod delta_arrow;
#[cfg(feature = "datafusion")]
pub mod delta_datafusion;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod writer;

pub use self::builder::*;
pub use self::config::*;
pub use self::data_catalog::{get_data_catalog, DataCatalog, DataCatalogError};
pub use self::delta::*;
pub use self::delta_config::*;
pub use self::partitions::*;
pub use self::schema::partitions::*;
pub use self::schema::*;
pub use self::table::*;
pub use errors::*;
pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore};
pub use operations::DeltaOps;
Expand Down
4 changes: 2 additions & 2 deletions rust/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::transaction::commit;
use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION};
use crate::action::{Action, DeltaOperation, MetaData, Protocol, SaveMode};
use crate::builder::ensure_table_uri;
use crate::delta_config::DeltaConfigKey;
use crate::config::DeltaConfigKey;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::schema::{SchemaDataType, SchemaField, SchemaTypeStruct};
use crate::storage::DeltaObjectStore;
Expand Down Expand Up @@ -322,7 +322,7 @@ impl std::future::IntoFuture for CreateBuilder {
#[cfg(all(test, feature = "parquet"))]
mod tests {
use super::*;
use crate::delta_config::DeltaConfigKey;
use crate::config::DeltaConfigKey;
use crate::operations::DeltaOps;
use crate::writer::test_utils::get_delta_schema;
use tempdir::TempDir;
Expand Down
2 changes: 1 addition & 1 deletion rust/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use crate::errors::{DeltaResult, DeltaTableError};
use crate::operations::transaction::commit;
use crate::operations::write::write_execution_plan;
use crate::storage::{DeltaObjectStore, ObjectStoreRef};
use crate::table_state::DeltaTableState;
use crate::table::state::DeltaTableState;
use crate::DeltaTable;

const PATH_COLUMN: &str = "__delta_rs_path";
Expand Down
2 changes: 1 addition & 1 deletion rust/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::action::{Action, Add, DeltaOperation, Remove};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::operations::transaction::commit;
use crate::storage::DeltaObjectStore;
use crate::table_state::DeltaTableState;
use crate::table::state::DeltaTableState;
use crate::DeltaTable;

/// Audit the Delta Table's active files with the underlying file system.
Expand Down
2 changes: 1 addition & 1 deletion rust/src/operations/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use futures::future::BoxFuture;

use crate::errors::{DeltaResult, DeltaTableError};
use crate::storage::DeltaObjectStore;
use crate::table_state::DeltaTableState;
use crate::table::state::DeltaTableState;
use crate::DeltaTable;

#[derive(Debug, Clone)]
Expand Down
2 changes: 1 addition & 1 deletion rust/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use super::writer::{PartitionWriter, PartitionWriterConfig};
use crate::action::{self, Action, DeltaOperation};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::storage::ObjectStoreRef;
use crate::table_state::DeltaTableState;
use crate::table::state::DeltaTableState;
use crate::writer::utils::arrow_schema_without_partitions;
use crate::{crate_version, DeltaTable, ObjectMeta, PartitionFilter};

Expand Down
33 changes: 23 additions & 10 deletions rust/src/operations/transaction/conflict_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use object_store::ObjectStore;

use super::CommitInfo;
use crate::action::{Action, Add, DeltaOperation, MetaData, Protocol, Remove};
use crate::delta_config::IsolationLevel;
use crate::config::IsolationLevel;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::storage::commit_uri_from_version;
use crate::table_state::DeltaTableState;
use crate::table::state::DeltaTableState;

#[cfg(feature = "datafusion")]
use super::state::AddContainer;
Expand Down Expand Up @@ -53,8 +53,8 @@ pub enum CommitConflictError {
/// you may need to upgrade your Delta Lake version.
/// - When multiple writers are creating or replacing a table at the same time.
/// - When multiple writers are writing to an empty path at the same time.
#[error("Protocol changed since last commit.")]
ProtocolChanged,
#[error("Protocol changed since last commit: {0}")]
ProtocolChanged(String),

/// Error returned when the table requires an unsupported writer version
#[error("Delta-rs does not support writer version {0}")]
Expand Down Expand Up @@ -392,10 +392,18 @@ impl<'a> ConflictChecker<'a> {
/// to read and write against the protocol set by the committed transaction.
fn check_protocol_compatibility(&self) -> Result<(), CommitConflictError> {
for p in self.winning_commit_summary.protocol() {
if self.txn_info.read_snapshot.min_reader_version() < p.min_reader_version
|| self.txn_info.read_snapshot.min_writer_version() < p.min_writer_version
{
return Err(CommitConflictError::ProtocolChanged);
let (win_read, curr_read) = (
p.min_reader_version,
self.txn_info.read_snapshot.min_reader_version(),
);
let (win_write, curr_write) = (
p.min_writer_version,
self.txn_info.read_snapshot.min_writer_version(),
);
if curr_read < win_read || win_write < curr_write {
return Err(CommitConflictError::ProtocolChanged(
format!("reqired read/write {win_read}/{win_write}, current read/write {curr_read}/{curr_write}"),
));
};
}
if !self.winning_commit_summary.protocol().is_empty()
Expand All @@ -405,7 +413,9 @@ impl<'a> ConflictChecker<'a> {
.iter()
.any(|a| matches!(a, Action::protocol(_)))
{
return Err(CommitConflictError::ProtocolChanged);
return Err(CommitConflictError::ProtocolChanged(
"protocol changed".into(),
));
};
Ok(())
}
Expand Down Expand Up @@ -818,7 +828,10 @@ mod tests {
vec![tu::create_protocol_action(None, None)],
false,
);
assert!(matches!(result, Err(CommitConflictError::ProtocolChanged)));
assert!(matches!(
result,
Err(CommitConflictError::ProtocolChanged(_))
));

// taint whole table
// `read_whole_table` should disallow any concurrent change, even if the change
Expand Down
5 changes: 3 additions & 2 deletions rust/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::action::{Action, CommitInfo, DeltaOperation};
use crate::crate_version;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::storage::commit_uri_from_version;
use crate::table_state::DeltaTableState;
use crate::table::state::DeltaTableState;

mod conflict_checker;
#[cfg(feature = "datafusion")]
Expand Down Expand Up @@ -164,7 +164,8 @@ pub async fn commit(
) -> DeltaResult<i64> {
let tmp_commit = prepare_commit(storage, &operation, actions, app_metadata).await?;

let max_attempts = 5;
// TODO make max attempts configurable
let max_attempts = 25;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increasing this was needed to make our concurrent writer tests pass.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh. I guess 5 workers trying to make 3 commits. That mean one potentially has to fail 14 times?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 was not cutting it for s3, so at least that...

let mut attempt_number = 1;

while attempt_number <= max_attempts {
Expand Down
2 changes: 1 addition & 1 deletion rust/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::delta_datafusion::{
get_null_of_arrow_type, logical_expr_to_physical_expr, to_correct_scalar_value,
};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::table_state::DeltaTableState;
use crate::table::state::DeltaTableState;

impl DeltaTableState {
/// Get the table schema as an [`ArrowSchemaRef`]
Expand Down
2 changes: 1 addition & 1 deletion rust/src/operations/transaction/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(unused)]
use super::{prepare_commit, try_commit_transaction, CommitInfo};
use crate::action::{Action, Add, DeltaOperation, MetaData, Protocol, Remove, SaveMode};
use crate::table_state::DeltaTableState;
use crate::table::state::DeltaTableState;
use crate::{
DeltaTable, DeltaTableBuilder, DeltaTableMetaData, Schema, SchemaDataType, SchemaField,
};
Expand Down
2 changes: 1 addition & 1 deletion rust/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use object_store::{path::Path, ObjectStore};

use crate::errors::{DeltaResult, DeltaTableError};
use crate::storage::DeltaObjectStore;
use crate::table_state::DeltaTableState;
use crate::table::state::DeltaTableState;
use crate::DeltaTable;

/// Errors that can occur during vacuum
Expand Down
4 changes: 2 additions & 2 deletions rust/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ use super::writer::{DeltaWriter, WriterConfig};
use super::MAX_SUPPORTED_WRITER_VERSION;
use super::{transaction::commit, CreateBuilder};
use crate::action::{Action, Add, DeltaOperation, Remove, SaveMode};
use crate::delta::DeltaTable;
use crate::delta_datafusion::DeltaDataChecker;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::schema::Schema;
use crate::storage::{DeltaObjectStore, ObjectStoreRef};
use crate::table_state::DeltaTableState;
use crate::table::state::DeltaTableState;
use crate::writer::record_batch::divide_by_partition_values;
use crate::writer::utils::PartitionPath;
use crate::DeltaTable;

#[derive(thiserror::Error, Debug)]
enum WriteError {
Expand Down
File renamed without changes.
4 changes: 4 additions & 0 deletions rust/src/schema.rs → rust/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ use std::collections::HashMap;

use crate::errors::DeltaTableError;

#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod arrow_convert;
pub mod partitions;

/// Type alias for a string expected to match a GUID/UUID format
pub type Guid = String;

Expand Down
54 changes: 25 additions & 29 deletions rust/src/partitions.rs → rust/src/schema/partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::convert::TryFrom;

use super::schema::SchemaDataType;
use super::SchemaDataType;
use crate::errors::DeltaTableError;
use std::cmp::Ordering;
use std::collections::HashMap;
Expand Down Expand Up @@ -189,20 +189,18 @@ pub struct DeltaTablePartition<'a> {
pub value: &'a str,
}

/**
* Create a DeltaTable partition from a HivePartition string.
*
* A HivePartition string is represented by a "key=value" format.
*
* ```rust
* use deltalake::DeltaTablePartition;
*
* let hive_part = "ds=2023-01-01";
* let partition = DeltaTablePartition::try_from(hive_part).unwrap();
* assert_eq!("ds", partition.key);
* assert_eq!("2023-01-01", partition.value);
* ```
*/
/// Create a DeltaTable partition from a HivePartition string.
///
/// A HivePartition string is represented by a "key=value" format.
///
/// ```rust
/// use deltalake::DeltaTablePartition;
///
/// let hive_part = "ds=2023-01-01";
/// let partition = DeltaTablePartition::try_from(hive_part).unwrap();
/// assert_eq!("ds", partition.key);
/// assert_eq!("2023-01-01", partition.value);
/// ```
impl<'a> TryFrom<&'a str> for DeltaTablePartition<'a> {
type Error = DeltaTableError;

Expand All @@ -223,20 +221,18 @@ impl<'a> TryFrom<&'a str> for DeltaTablePartition<'a> {
}

impl<'a> DeltaTablePartition<'a> {
/**
* Try to create a DeltaTable partition from a partition value kv pair.
*
* ```rust
* use deltalake::DeltaTablePartition;
*
* let value = (&"ds".to_string(), &Some("2023-01-01".to_string()));
* let null_default = "1979-01-01";
* let partition = DeltaTablePartition::from_partition_value(value, null_default);
*
* assert_eq!("ds", partition.key);
* assert_eq!("2023-01-01", partition.value);
* ```
*/
/// Try to create a DeltaTable partition from a partition value kv pair.
///
/// ```rust
/// use deltalake::DeltaTablePartition;
///
/// let value = (&"ds".to_string(), &Some("2023-01-01".to_string()));
/// let null_default = "1979-01-01";
/// let partition = DeltaTablePartition::from_partition_value(value, null_default);
///
/// assert_eq!("ds", partition.key);
/// assert_eq!("2023-01-01", partition.value);
/// ```
pub fn from_partition_value(
partition_value: (&'a String, &'a Option<String>),
default_for_null: &'a str,
Expand Down
2 changes: 1 addition & 1 deletion rust/src/builder.rs → rust/src/table/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use object_store::DynObjectStore;
use serde::{Deserialize, Serialize};
use url::Url;

use crate::delta::DeltaTable;
use super::DeltaTable;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::storage::config::StorageOptions;
use crate::storage::{DeltaObjectStore, ObjectStoreRef};
Expand Down
File renamed without changes.
Loading