Skip to content

Commit

Permalink
refactor: re-organize top level modules (#1434)
Browse files Browse the repository at this point in the history
# Description

~This contains changes from #1432, will rebase once that's merged.~

This PR constitutes the bulk of re-organising our top level modules.
- move `DeltaTable*` structs into new `table` module
- move table configuration into `table` module
- move schema related modules into `schema` module
- rename `action` module to `protocol` - hoping to isolate everything
that can one day be the log kernel.

~It also removes the deprecated commit logic from `DeltaTable` and
updates call sites and tests accordingly.~

I am planning one more follow up, where I hope to make `transactions`
currently within `operations` a top level module. While the number of
touched files here is already massive, I want to do this in a follow up,
as it will also include some updates to the transactions itself, that
should be more carefully reviewed.

# Related Issue(s)

closes: #1136

# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
roeap authored Sep 26, 2023
1 parent 56e1e87 commit 65179b6
Show file tree
Hide file tree
Showing 71 changed files with 359 additions and 369 deletions.
2 changes: 1 addition & 1 deletion python/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arrow_schema::ArrowError;
use deltalake::action::ProtocolError;
use deltalake::protocol::ProtocolError;
use deltalake::{errors::DeltaTableError, ObjectStoreError};
use pyo3::exceptions::{
PyException, PyFileNotFoundError, PyIOError, PyNotImplementedError, PyValueError,
Expand Down
18 changes: 9 additions & 9 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,9 @@ use std::time::{SystemTime, UNIX_EPOCH};

use arrow::pyarrow::PyArrowType;
use chrono::{DateTime, Duration, FixedOffset, Utc};
use deltalake::action::{
self, Action, ColumnCountStat, ColumnValueStat, DeltaOperation, SaveMode, Stats,
};
use deltalake::arrow::compute::concat_batches;
use deltalake::arrow::record_batch::RecordBatch;
use deltalake::arrow::{self, datatypes::Schema as ArrowSchema};
use deltalake::builder::DeltaTableBuilder;
use deltalake::checkpoints::create_checkpoint;
use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
Expand All @@ -30,6 +26,10 @@ use deltalake::operations::restore::RestoreBuilder;
use deltalake::operations::transaction::commit;
use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::partitions::PartitionFilter;
use deltalake::protocol::{
self, Action, ColumnCountStat, ColumnValueStat, DeltaOperation, SaveMode, Stats,
};
use deltalake::DeltaTableBuilder;
use deltalake::{DeltaOps, Invariant, Schema};
use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError};
use pyo3::prelude::*;
Expand Down Expand Up @@ -497,7 +497,7 @@ impl RawDeltaTable {

let existing_schema = self._table.get_schema().map_err(PythonError::from)?;

let mut actions: Vec<action::Action> = add_actions
let mut actions: Vec<protocol::Action> = add_actions
.iter()
.map(|add| Action::add(add.into()))
.collect();
Expand All @@ -515,7 +515,7 @@ impl RawDeltaTable {
.map_err(PythonError::from)?;

for old_add in add_actions {
let remove_action = Action::remove(action::Remove {
let remove_action = Action::remove(protocol::Remove {
path: old_add.path.clone(),
deletion_timestamp: Some(current_timestamp()),
data_change: true,
Expand All @@ -536,7 +536,7 @@ impl RawDeltaTable {
.map_err(PythonError::from)?
.clone();
metadata.schema = schema;
let metadata_action = action::MetaData::try_from(metadata)
let metadata_action = protocol::MetaData::try_from(metadata)
.map_err(|_| PyValueError::new_err("Failed to reparse metadata"))?;
actions.push(Action::metaData(metadata_action));
}
Expand Down Expand Up @@ -795,9 +795,9 @@ pub struct PyAddAction {
stats: Option<String>,
}

impl From<&PyAddAction> for action::Add {
impl From<&PyAddAction> for protocol::Add {
fn from(action: &PyAddAction) -> Self {
action::Add {
protocol::Add {
path: action.path.clone(),
size: action.size,
partition_values: action.partition_values.clone(),
Expand Down
4 changes: 2 additions & 2 deletions rust/benches/read_checkpoint.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use criterion::{criterion_group, criterion_main, Criterion};
use deltalake::delta::DeltaTableConfig;
use deltalake::table_state::DeltaTableState;
use deltalake::table::state::DeltaTableState;
use deltalake::DeltaTableConfig;
use std::fs::File;
use std::io::Read;

Expand Down
2 changes: 1 addition & 1 deletion rust/examples/basic_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arrow::{
record_batch::RecordBatch,
};
use deltalake::operations::collect_sendable_stream;
use deltalake::{action::SaveMode, DeltaOps, SchemaDataType, SchemaField};
use deltalake::{protocol::SaveMode, DeltaOps, SchemaDataType, SchemaField};
use parquet::{
basic::{Compression, ZstdLevel},
file::properties::WriterProperties,
Expand Down
3 changes: 2 additions & 1 deletion rust/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ use futures::TryStreamExt;
use object_store::ObjectStore;

use crate::errors::DeltaResult;
use crate::open_table_with_storage_options;
use crate::storage::config::{configure_store, StorageOptions};
use crate::{ensure_table_uri, open_table_with_storage_options};
use crate::table::builder::ensure_table_uri;

const DELTA_LOG_FOLDER: &str = "_delta_log";

Expand Down
12 changes: 6 additions & 6 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use object_store::ObjectMeta;
use url::Url;

use crate::action::{self, Add};
use crate::builder::ensure_table_uri;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::protocol::{self, Add};
use crate::storage::ObjectStoreRef;
use crate::table_state::DeltaTableState;
use crate::table::builder::ensure_table_uri;
use crate::table::state::DeltaTableState;
use crate::{open_table, open_table_with_storage_options, DeltaTable, Invariant, SchemaDataType};

const PATH_COLUMN: &str = "__delta_rs_path";
Expand Down Expand Up @@ -124,7 +124,7 @@ impl DeltaTableState {
|acc, action| {
let new_stats = action
.get_stats()
.unwrap_or_else(|_| Some(action::Stats::default()))?;
.unwrap_or_else(|_| Some(protocol::Stats::default()))?;
Some(Statistics {
num_rows: acc
.num_rows
Expand Down Expand Up @@ -631,7 +631,7 @@ pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarVal
}

pub(crate) fn partitioned_file_from_action(
action: &action::Add,
action: &protocol::Add,
schema: &ArrowSchema,
) -> PartitionedFile {
let partition_values = schema
Expand Down Expand Up @@ -1541,7 +1541,7 @@ mod tests {
let mut partition_values = std::collections::HashMap::new();
partition_values.insert("month".to_string(), Some("1".to_string()));
partition_values.insert("year".to_string(), Some("2015".to_string()));
let action = action::Add {
let action = protocol::Add {
path: "year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string(),
size: 10644,
partition_values,
Expand Down
2 changes: 1 addition & 1 deletion rust/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Exceptions for the deltalake crate
use object_store::Error as ObjectStoreError;

use crate::action::ProtocolError;
use crate::operations::transaction::TransactionError;
use crate::protocol::ProtocolError;

/// A result returned by delta-rs
pub type DeltaResult<T> = Result<T, DeltaTableError>;
Expand Down
94 changes: 72 additions & 22 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,42 +82,34 @@ compile_error!(
"Features s3 and s3-native-tls are mutually exclusive and cannot be enabled together"
);

pub mod action;
pub mod builder;
pub mod data_catalog;
pub mod delta;
pub mod delta_config;
pub mod errors;
pub mod operations;
pub mod partitions;
pub mod protocol;
pub mod schema;
pub mod storage;
pub mod table_state;
pub mod time_utils;
pub mod table;

#[cfg(feature = "arrow")]
pub mod table_state_arrow;

#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod delta_arrow;
#[cfg(feature = "datafusion")]
pub mod delta_datafusion;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod writer;

pub use self::builder::*;
use std::collections::HashMap;

pub use self::data_catalog::{get_data_catalog, DataCatalog, DataCatalogError};
pub use self::delta::*;
pub use self::delta_config::*;
pub use self::partitions::*;
pub use self::errors::*;
pub use self::schema::partitions::*;
pub use self::schema::*;
pub use errors::*;
pub use self::table::builder::{
DeltaTableBuilder, DeltaTableConfig, DeltaTableLoadOptions, DeltaVersion,
};
pub use self::table::config::DeltaConfigKey;
pub use self::table::DeltaTable;
pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore};
pub use operations::DeltaOps;

// convenience exports for consumers to avoid aligning crate versions
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub use action::checkpoints;
#[cfg(feature = "arrow")]
pub use arrow;
#[cfg(feature = "datafusion")]
Expand All @@ -126,15 +118,70 @@ pub use datafusion;
pub use parquet;
#[cfg(feature = "parquet2")]
pub use parquet2;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub use protocol::checkpoints;

// needed only for integration tests
// TODO can / should we move this into the test crate?
#[cfg(feature = "integration_test")]
pub mod test_utils;

/// Creates and loads a DeltaTable from the given path with current metadata.
/// Infers the storage backend to use from the scheme in the given table path.
pub async fn open_table(table_uri: impl AsRef<str>) -> Result<DeltaTable, DeltaTableError> {
let table = DeltaTableBuilder::from_uri(table_uri).load().await?;
Ok(table)
}

/// Same as `open_table`, but also accepts storage options to aid in building the table for a deduced
/// `StorageService`.
pub async fn open_table_with_storage_options(
table_uri: impl AsRef<str>,
storage_options: HashMap<String, String>,
) -> Result<DeltaTable, DeltaTableError> {
let table = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(storage_options)
.load()
.await?;
Ok(table)
}

/// Creates a DeltaTable from the given path and loads it with the metadata from the given version.
/// Infers the storage backend to use from the scheme in the given table path.
pub async fn open_table_with_version(
table_uri: impl AsRef<str>,
version: i64,
) -> Result<DeltaTable, DeltaTableError> {
let table = DeltaTableBuilder::from_uri(table_uri)
.with_version(version)
.load()
.await?;
Ok(table)
}

/// Creates a DeltaTable from the given path.
/// Loads metadata from the version appropriate based on the given ISO-8601/RFC-3339 timestamp.
/// Infers the storage backend to use from the scheme in the given table path.
pub async fn open_table_with_ds(
table_uri: impl AsRef<str>,
ds: impl AsRef<str>,
) -> Result<DeltaTable, DeltaTableError> {
let table = DeltaTableBuilder::from_uri(table_uri)
.with_datestring(ds)?
.load()
.await?;
Ok(table)
}

/// Returns rust crate version, can be use used in language bindings to expose Rust core version
pub fn crate_version() -> &'static str {
env!("CARGO_PKG_VERSION")
}

#[cfg(test)]
mod tests {
use super::*;
use crate::table::PeekCommit;
use std::collections::HashMap;

#[tokio::test]
Expand All @@ -153,7 +200,7 @@ mod tests {
);
let tombstones = table.get_state().all_tombstones();
assert_eq!(tombstones.len(), 4);
assert!(tombstones.contains(&crate::action::Remove {
assert!(tombstones.contains(&crate::protocol::Remove {
path: "part-00000-512e1537-8aaa-4193-b8b4-bef3de0de409-c000.snappy.parquet".to_string(),
deletion_timestamp: Some(1564524298213),
data_change: false,
Expand Down Expand Up @@ -255,7 +302,7 @@ mod tests {
);
let tombstones = table.get_state().all_tombstones();
assert_eq!(tombstones.len(), 1);
assert!(tombstones.contains(&crate::action::Remove {
assert!(tombstones.contains(&crate::protocol::Remove {
path: "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet".to_string(),
deletion_timestamp: Some(1615043776198),
data_change: true,
Expand Down Expand Up @@ -475,7 +522,10 @@ mod tests {
let table_from_struct_stats = crate::open_table(table_uri).await.unwrap();
let table_from_json_stats = crate::open_table_with_version(table_uri, 1).await.unwrap();

fn get_stats_for_file(table: &crate::DeltaTable, file_name: &str) -> crate::action::Stats {
fn get_stats_for_file(
table: &crate::DeltaTable,
file_name: &str,
) -> crate::protocol::Stats {
table
.get_file_uris()
.zip(table.get_stats())
Expand Down
13 changes: 7 additions & 6 deletions rust/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ use serde_json::{Map, Value};

use super::transaction::commit;
use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION};
use crate::action::{Action, DeltaOperation, MetaData, Protocol, SaveMode};
use crate::builder::ensure_table_uri;
use crate::delta_config::DeltaConfigKey;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::protocol::{Action, DeltaOperation, MetaData, Protocol, SaveMode};
use crate::schema::{SchemaDataType, SchemaField, SchemaTypeStruct};
use crate::storage::DeltaObjectStore;
use crate::{DeltaTable, DeltaTableBuilder, DeltaTableMetaData};
use crate::table::builder::ensure_table_uri;
use crate::table::config::DeltaConfigKey;
use crate::table::DeltaTableMetaData;
use crate::{DeltaTable, DeltaTableBuilder};

#[derive(thiserror::Error, Debug)]
enum CreateError {
Expand Down Expand Up @@ -148,7 +149,7 @@ impl CreateBuilder {
///
/// Options may be passed in the HashMap or set as environment variables.
///
/// [crate::builder::s3_storage_options] describes the available options for the AWS or S3-compliant backend.
/// [crate::table::builder::s3_storage_options] describes the available options for the AWS or S3-compliant backend.
/// [dynamodb_lock::DynamoDbLockClient] describes additional options for the AWS atomic rename client.
///
/// If an object store is also passed using `with_object_store()` these options will be ignored.
Expand Down Expand Up @@ -322,8 +323,8 @@ impl std::future::IntoFuture for CreateBuilder {
#[cfg(all(test, feature = "parquet"))]
mod tests {
use super::*;
use crate::delta_config::DeltaConfigKey;
use crate::operations::DeltaOps;
use crate::table::config::DeltaConfigKey;
use crate::writer::test_utils::get_delta_schema;
use tempdir::TempDir;

Expand Down
11 changes: 6 additions & 5 deletions rust/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};

use crate::action::{Action, Add, Remove};
use crate::protocol::{Action, Add, Remove};
use datafusion::execution::context::{SessionContext, SessionState};
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::filter::FilterExec;
Expand All @@ -33,13 +33,14 @@ use parquet::file::properties::WriterProperties;
use serde_json::Map;
use serde_json::Value;

use crate::action::DeltaOperation;
use crate::delta_datafusion::{find_files, parquet_scan_from_actions, register_store};
use crate::delta_datafusion::find_files;
use crate::delta_datafusion::{parquet_scan_from_actions, register_store};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::operations::transaction::commit;
use crate::operations::write::write_execution_plan;
use crate::protocol::DeltaOperation;
use crate::storage::{DeltaObjectStore, ObjectStoreRef};
use crate::table_state::DeltaTableState;
use crate::table::state::DeltaTableState;
use crate::DeltaTable;

use super::datafusion_utils::Expression;
Expand Down Expand Up @@ -324,8 +325,8 @@ impl std::future::IntoFuture for DeleteBuilder {
#[cfg(test)]
mod tests {

use crate::action::*;
use crate::operations::DeltaOps;
use crate::protocol::*;
use crate::writer::test_utils::datafusion::get_data;
use crate::writer::test_utils::{get_arrow_schema, get_delta_schema};
use crate::DeltaTable;
Expand Down
4 changes: 2 additions & 2 deletions rust/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ pub use object_store::path::Path;
use object_store::ObjectStore;
use url::{ParseError, Url};

use crate::action::{Action, Add, DeltaOperation, Remove};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::operations::transaction::commit;
use crate::protocol::{Action, Add, DeltaOperation, Remove};
use crate::storage::DeltaObjectStore;
use crate::table_state::DeltaTableState;
use crate::table::state::DeltaTableState;
use crate::DeltaTable;

/// Audit the Delta Table's active files with the underlying file system.
Expand Down
Loading

0 comments on commit 65179b6

Please sign in to comment.