Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: re-organize top level modules #1434

Merged
merged 26 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4b3ccf5
chore: remove deprecated create function on DeltaTable
roeap Jun 4, 2023
9a6d87d
chore: remove deprecated vacuum function on DeltaTable
roeap Jun 4, 2023
57d0835
chore: remove deprecated commit 1/n
roeap Jun 4, 2023
05c51a9
fix: actually load table in concurrency tests.
roeap Jun 4, 2023
e56f003
refactor: move table into separate module
roeap Jun 4, 2023
d1e0b32
refactor: rename arrow module
roeap Jun 4, 2023
cf8c104
fix: test import
roeap Jun 4, 2023
17fb81b
fix: clippy
roeap Jun 5, 2023
1fb080c
fix: docs link
roeap Jun 6, 2023
1d5a9d2
refactor: move all schema-related into schema module
roeap Jun 6, 2023
891754a
refactor: move table config into table module
roeap Jun 6, 2023
08a6a67
fix: inclerease commit attempts
roeap Jun 6, 2023
00af840
refactor: be more deliberate about exports
roeap Jun 6, 2023
56bbd21
refactor: move time_utils into action module
roeap Jun 6, 2023
be7f961
fix: docs links
roeap Jun 6, 2023
c785358
reafctor: rename actions to protocol
roeap Jun 6, 2023
e5c323a
fix: parquet2 imports
roeap Jun 6, 2023
4bacb42
Merge branch 'main' into module-cleanup
roeap Jun 7, 2023
53f8df4
Merge branch 'main' into module-cleanup
roeap Jun 12, 2023
2747acb
Merge branch 'main' into module-cleanup
roeap Jun 14, 2023
3b810cb
Merge branch 'main' into module-cleanup
roeap Jun 20, 2023
6d11b68
Merge branch 'main' into module-cleanup
roeap Jul 17, 2023
c9bd37c
fix: imports after merge
roeap Jul 17, 2023
22b500d
Merge branch 'main' into module-cleanup
roeap Sep 23, 2023
2a425d7
chore: various fixes and cleanup after merge
roeap Sep 23, 2023
01cf2da
Merge branch 'main' into module-cleanup
roeap Sep 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arrow_schema::ArrowError;
use deltalake::action::ProtocolError;
use deltalake::protocol::ProtocolError;
use deltalake::{errors::DeltaTableError, ObjectStoreError};
use pyo3::exceptions::{
PyException, PyFileNotFoundError, PyIOError, PyNotImplementedError, PyValueError,
Expand Down
18 changes: 9 additions & 9 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,9 @@ use std::time::{SystemTime, UNIX_EPOCH};

use arrow::pyarrow::PyArrowType;
use chrono::{DateTime, Duration, FixedOffset, Utc};
use deltalake::action::{
self, Action, ColumnCountStat, ColumnValueStat, DeltaOperation, SaveMode, Stats,
};
use deltalake::arrow::compute::concat_batches;
use deltalake::arrow::record_batch::RecordBatch;
use deltalake::arrow::{self, datatypes::Schema as ArrowSchema};
use deltalake::builder::DeltaTableBuilder;
use deltalake::checkpoints::create_checkpoint;
use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
Expand All @@ -30,6 +26,10 @@ use deltalake::operations::restore::RestoreBuilder;
use deltalake::operations::transaction::commit;
use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::partitions::PartitionFilter;
use deltalake::protocol::{
self, Action, ColumnCountStat, ColumnValueStat, DeltaOperation, SaveMode, Stats,
};
use deltalake::DeltaTableBuilder;
use deltalake::{DeltaOps, Invariant, Schema};
use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError};
use pyo3::prelude::*;
Expand Down Expand Up @@ -497,7 +497,7 @@ impl RawDeltaTable {

let existing_schema = self._table.get_schema().map_err(PythonError::from)?;

let mut actions: Vec<action::Action> = add_actions
let mut actions: Vec<protocol::Action> = add_actions
.iter()
.map(|add| Action::add(add.into()))
.collect();
Expand All @@ -515,7 +515,7 @@ impl RawDeltaTable {
.map_err(PythonError::from)?;

for old_add in add_actions {
let remove_action = Action::remove(action::Remove {
let remove_action = Action::remove(protocol::Remove {
path: old_add.path.clone(),
deletion_timestamp: Some(current_timestamp()),
data_change: true,
Expand All @@ -536,7 +536,7 @@ impl RawDeltaTable {
.map_err(PythonError::from)?
.clone();
metadata.schema = schema;
let metadata_action = action::MetaData::try_from(metadata)
let metadata_action = protocol::MetaData::try_from(metadata)
.map_err(|_| PyValueError::new_err("Failed to reparse metadata"))?;
actions.push(Action::metaData(metadata_action));
}
Expand Down Expand Up @@ -795,9 +795,9 @@ pub struct PyAddAction {
stats: Option<String>,
}

impl From<&PyAddAction> for action::Add {
impl From<&PyAddAction> for protocol::Add {
fn from(action: &PyAddAction) -> Self {
action::Add {
protocol::Add {
path: action.path.clone(),
size: action.size,
partition_values: action.partition_values.clone(),
Expand Down
4 changes: 2 additions & 2 deletions rust/benches/read_checkpoint.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use criterion::{criterion_group, criterion_main, Criterion};
use deltalake::delta::DeltaTableConfig;
use deltalake::table_state::DeltaTableState;
use deltalake::table::state::DeltaTableState;
use deltalake::DeltaTableConfig;
use std::fs::File;
use std::io::Read;

Expand Down
2 changes: 1 addition & 1 deletion rust/examples/basic_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arrow::{
record_batch::RecordBatch,
};
use deltalake::operations::collect_sendable_stream;
use deltalake::{action::SaveMode, DeltaOps, SchemaDataType, SchemaField};
use deltalake::{protocol::SaveMode, DeltaOps, SchemaDataType, SchemaField};
use parquet::{
basic::{Compression, ZstdLevel},
file::properties::WriterProperties,
Expand Down
3 changes: 2 additions & 1 deletion rust/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ use futures::TryStreamExt;
use object_store::ObjectStore;

use crate::errors::DeltaResult;
use crate::open_table_with_storage_options;
use crate::storage::config::{configure_store, StorageOptions};
use crate::{ensure_table_uri, open_table_with_storage_options};
use crate::table::builder::ensure_table_uri;

const DELTA_LOG_FOLDER: &str = "_delta_log";

Expand Down
12 changes: 6 additions & 6 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use object_store::ObjectMeta;
use url::Url;

use crate::action::{self, Add};
use crate::builder::ensure_table_uri;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::protocol::{self, Add};
use crate::storage::ObjectStoreRef;
use crate::table_state::DeltaTableState;
use crate::table::builder::ensure_table_uri;
use crate::table::state::DeltaTableState;
use crate::{open_table, open_table_with_storage_options, DeltaTable, Invariant, SchemaDataType};

const PATH_COLUMN: &str = "__delta_rs_path";
Expand Down Expand Up @@ -124,7 +124,7 @@ impl DeltaTableState {
|acc, action| {
let new_stats = action
.get_stats()
.unwrap_or_else(|_| Some(action::Stats::default()))?;
.unwrap_or_else(|_| Some(protocol::Stats::default()))?;
Some(Statistics {
num_rows: acc
.num_rows
Expand Down Expand Up @@ -631,7 +631,7 @@ pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarVal
}

pub(crate) fn partitioned_file_from_action(
action: &action::Add,
action: &protocol::Add,
schema: &ArrowSchema,
) -> PartitionedFile {
let partition_values = schema
Expand Down Expand Up @@ -1541,7 +1541,7 @@ mod tests {
let mut partition_values = std::collections::HashMap::new();
partition_values.insert("month".to_string(), Some("1".to_string()));
partition_values.insert("year".to_string(), Some("2015".to_string()));
let action = action::Add {
let action = protocol::Add {
path: "year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string(),
size: 10644,
partition_values,
Expand Down
2 changes: 1 addition & 1 deletion rust/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Exceptions for the deltalake crate
use object_store::Error as ObjectStoreError;

use crate::action::ProtocolError;
use crate::operations::transaction::TransactionError;
use crate::protocol::ProtocolError;

/// A result returned by delta-rs
pub type DeltaResult<T> = Result<T, DeltaTableError>;
Expand Down
94 changes: 72 additions & 22 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,42 +82,34 @@ compile_error!(
"Features s3 and s3-native-tls are mutually exclusive and cannot be enabled together"
);

pub mod action;
pub mod builder;
pub mod data_catalog;
pub mod delta;
pub mod delta_config;
pub mod errors;
pub mod operations;
pub mod partitions;
pub mod protocol;
pub mod schema;
pub mod storage;
pub mod table_state;
pub mod time_utils;
pub mod table;

#[cfg(feature = "arrow")]
pub mod table_state_arrow;

#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod delta_arrow;
#[cfg(feature = "datafusion")]
pub mod delta_datafusion;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod writer;

pub use self::builder::*;
use std::collections::HashMap;

pub use self::data_catalog::{get_data_catalog, DataCatalog, DataCatalogError};
pub use self::delta::*;
pub use self::delta_config::*;
pub use self::partitions::*;
pub use self::errors::*;
pub use self::schema::partitions::*;
pub use self::schema::*;
pub use errors::*;
pub use self::table::builder::{
DeltaTableBuilder, DeltaTableConfig, DeltaTableLoadOptions, DeltaVersion,
};
pub use self::table::config::DeltaConfigKey;
pub use self::table::DeltaTable;
pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore};
pub use operations::DeltaOps;

// convenience exports for consumers to avoid aligning crate versions
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub use action::checkpoints;
#[cfg(feature = "arrow")]
pub use arrow;
#[cfg(feature = "datafusion")]
Expand All @@ -126,15 +118,70 @@ pub use datafusion;
pub use parquet;
#[cfg(feature = "parquet2")]
pub use parquet2;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub use protocol::checkpoints;

// needed only for integration tests
// TODO can / should we move this into the test crate?
#[cfg(feature = "integration_test")]
pub mod test_utils;

/// Creates and loads a DeltaTable from the given path with current metadata.
/// Infers the storage backend to use from the scheme in the given table path.
pub async fn open_table(table_uri: impl AsRef<str>) -> Result<DeltaTable, DeltaTableError> {
let table = DeltaTableBuilder::from_uri(table_uri).load().await?;
Ok(table)
}

/// Same as `open_table`, but also accepts storage options to aid in building the table for a deduced
/// `StorageService`.
pub async fn open_table_with_storage_options(
table_uri: impl AsRef<str>,
storage_options: HashMap<String, String>,
) -> Result<DeltaTable, DeltaTableError> {
let table = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(storage_options)
.load()
.await?;
Ok(table)
}

/// Creates a DeltaTable from the given path and loads it with the metadata from the given version.
/// Infers the storage backend to use from the scheme in the given table path.
pub async fn open_table_with_version(
table_uri: impl AsRef<str>,
version: i64,
) -> Result<DeltaTable, DeltaTableError> {
let table = DeltaTableBuilder::from_uri(table_uri)
.with_version(version)
.load()
.await?;
Ok(table)
}

/// Creates a DeltaTable from the given path.
/// Loads metadata from the version appropriate based on the given ISO-8601/RFC-3339 timestamp.
/// Infers the storage backend to use from the scheme in the given table path.
pub async fn open_table_with_ds(
table_uri: impl AsRef<str>,
ds: impl AsRef<str>,
) -> Result<DeltaTable, DeltaTableError> {
let table = DeltaTableBuilder::from_uri(table_uri)
.with_datestring(ds)?
.load()
.await?;
Ok(table)
}

/// Returns rust crate version, can be use used in language bindings to expose Rust core version
pub fn crate_version() -> &'static str {
env!("CARGO_PKG_VERSION")
}

#[cfg(test)]
mod tests {
use super::*;
use crate::table::PeekCommit;
use std::collections::HashMap;

#[tokio::test]
Expand All @@ -153,7 +200,7 @@ mod tests {
);
let tombstones = table.get_state().all_tombstones();
assert_eq!(tombstones.len(), 4);
assert!(tombstones.contains(&crate::action::Remove {
assert!(tombstones.contains(&crate::protocol::Remove {
path: "part-00000-512e1537-8aaa-4193-b8b4-bef3de0de409-c000.snappy.parquet".to_string(),
deletion_timestamp: Some(1564524298213),
data_change: false,
Expand Down Expand Up @@ -255,7 +302,7 @@ mod tests {
);
let tombstones = table.get_state().all_tombstones();
assert_eq!(tombstones.len(), 1);
assert!(tombstones.contains(&crate::action::Remove {
assert!(tombstones.contains(&crate::protocol::Remove {
path: "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet".to_string(),
deletion_timestamp: Some(1615043776198),
data_change: true,
Expand Down Expand Up @@ -475,7 +522,10 @@ mod tests {
let table_from_struct_stats = crate::open_table(table_uri).await.unwrap();
let table_from_json_stats = crate::open_table_with_version(table_uri, 1).await.unwrap();

fn get_stats_for_file(table: &crate::DeltaTable, file_name: &str) -> crate::action::Stats {
fn get_stats_for_file(
table: &crate::DeltaTable,
file_name: &str,
) -> crate::protocol::Stats {
table
.get_file_uris()
.zip(table.get_stats())
Expand Down
13 changes: 7 additions & 6 deletions rust/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ use serde_json::{Map, Value};

use super::transaction::commit;
use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION};
use crate::action::{Action, DeltaOperation, MetaData, Protocol, SaveMode};
use crate::builder::ensure_table_uri;
use crate::delta_config::DeltaConfigKey;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::protocol::{Action, DeltaOperation, MetaData, Protocol, SaveMode};
use crate::schema::{SchemaDataType, SchemaField, SchemaTypeStruct};
use crate::storage::DeltaObjectStore;
use crate::{DeltaTable, DeltaTableBuilder, DeltaTableMetaData};
use crate::table::builder::ensure_table_uri;
use crate::table::config::DeltaConfigKey;
use crate::table::DeltaTableMetaData;
use crate::{DeltaTable, DeltaTableBuilder};

#[derive(thiserror::Error, Debug)]
enum CreateError {
Expand Down Expand Up @@ -148,7 +149,7 @@ impl CreateBuilder {
///
/// Options may be passed in the HashMap or set as environment variables.
///
/// [crate::builder::s3_storage_options] describes the available options for the AWS or S3-compliant backend.
/// [crate::table::builder::s3_storage_options] describes the available options for the AWS or S3-compliant backend.
/// [dynamodb_lock::DynamoDbLockClient] describes additional options for the AWS atomic rename client.
///
/// If an object store is also passed using `with_object_store()` these options will be ignored.
Expand Down Expand Up @@ -322,8 +323,8 @@ impl std::future::IntoFuture for CreateBuilder {
#[cfg(all(test, feature = "parquet"))]
mod tests {
use super::*;
use crate::delta_config::DeltaConfigKey;
use crate::operations::DeltaOps;
use crate::table::config::DeltaConfigKey;
use crate::writer::test_utils::get_delta_schema;
use tempdir::TempDir;

Expand Down
11 changes: 6 additions & 5 deletions rust/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};

use crate::action::{Action, Add, Remove};
use crate::protocol::{Action, Add, Remove};
use datafusion::execution::context::{SessionContext, SessionState};
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::filter::FilterExec;
Expand All @@ -33,13 +33,14 @@ use parquet::file::properties::WriterProperties;
use serde_json::Map;
use serde_json::Value;

use crate::action::DeltaOperation;
use crate::delta_datafusion::{find_files, parquet_scan_from_actions, register_store};
use crate::delta_datafusion::find_files;
use crate::delta_datafusion::{parquet_scan_from_actions, register_store};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::operations::transaction::commit;
use crate::operations::write::write_execution_plan;
use crate::protocol::DeltaOperation;
use crate::storage::{DeltaObjectStore, ObjectStoreRef};
use crate::table_state::DeltaTableState;
use crate::table::state::DeltaTableState;
use crate::DeltaTable;

use super::datafusion_utils::Expression;
Expand Down Expand Up @@ -324,8 +325,8 @@ impl std::future::IntoFuture for DeleteBuilder {
#[cfg(test)]
mod tests {

use crate::action::*;
use crate::operations::DeltaOps;
use crate::protocol::*;
use crate::writer::test_utils::datafusion::get_data;
use crate::writer::test_utils::{get_arrow_schema, get_delta_schema};
use crate::DeltaTable;
Expand Down
4 changes: 2 additions & 2 deletions rust/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ pub use object_store::path::Path;
use object_store::ObjectStore;
use url::{ParseError, Url};

use crate::action::{Action, Add, DeltaOperation, Remove};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::operations::transaction::commit;
use crate::protocol::{Action, Add, DeltaOperation, Remove};
use crate::storage::DeltaObjectStore;
use crate::table_state::DeltaTableState;
use crate::table::state::DeltaTableState;
use crate::DeltaTable;

/// Audit the Delta Table's active files with the underlying file system.
Expand Down
Loading
Loading