diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8f23287773..adb28fec7b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -43,11 +43,13 @@ jobs: override: true - uses: Swatinem/rust-cache@v1 - name: build and lint with clippy - run: cargo clippy --features azure,datafusion-ext,s3,gcs,glue + run: cargo clippy --features azure,datafusion,s3,gcs,glue - name: Spot-check build for rustls features run: cargo clippy --features s3-rustls - name: Check docs - run: cargo doc --features azure,datafusion-ext,s3,gcs,glue + run: cargo doc --features azure,datafusion,s3,gcs,glue + - name: Check no default features + run: cargo check --no-default-features test: strategy: @@ -68,7 +70,7 @@ jobs: override: true - uses: Swatinem/rust-cache@v1 - name: Run tests - run: cargo test --verbose --features datafusion-ext,azure + run: cargo test --verbose --features datafusion,azure integration_test: name: Integration Tests @@ -107,10 +109,10 @@ jobs: - name: Run tests with default ssl run: | - cargo test --features integration_test,azure,s3,gcs,datafusion-ext + cargo test --features integration_test,azure,s3,gcs,datafusion - name: Run tests with rustls run: | - cargo test --features integration_test,s3-rustls,datafusion-ext + cargo test --features integration_test,s3-rustls,datafusion parquet2_test: runs-on: ubuntu-latest diff --git a/python/Cargo.toml b/python/Cargo.toml index 240cd35d72..2035b03a18 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -38,4 +38,4 @@ features = ["extension-module", "abi3", "abi3-py37"] [dependencies.deltalake] path = "../rust" version = "0" -features = ["s3", "azure", "glue", "gcs", "python", "datafusion-ext"] +features = ["s3", "azure", "glue", "gcs", "python", "datafusion"] diff --git a/python/pyproject.toml b/python/pyproject.toml index 662cfeddc4..c66f70dc42 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -34,7 +34,8 @@ devel = [ "pytest-timeout", "sphinx<=4.5", "sphinx-rtd-theme", - "toml" + "toml", + "wheel" ] pyspark = [ "pyspark", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 9593cf2373..f04bd04e4d 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -13,7 +13,7 @@ edition = "2021" arrow = { version = "28", optional = true } async-trait = "0.1" bytes = "1" -chrono = "0.4.22" +chrono = { version = "0.4.22", default-features = false, features = ["clock"] } cfg-if = "1" errno = "0.2" futures = "0.3" @@ -22,7 +22,7 @@ log = "0" libc = ">=0.2.90, <1" num-bigint = "0.4" num-traits = "0.2.15" -object_store = { version = "0.5.2", features = ["aws_profile"] } +object_store = "0.5.2" once_cell = "1.16.0" parking_lot = "0.12" parquet = { version = "28", features = ["async"], optional = true } @@ -77,14 +77,15 @@ glibc_version = { path = "../glibc_version", version = "0.1" } [features] default = ["arrow", "parquet"] -datafusion-ext = [ - "datafusion", +datafusion = [ + "dep:datafusion", "datafusion-expr", "datafusion-common", "datafusion-proto", "arrow", "parquet", ] +datafusion-ext = ["datafusion"] azure = ["object_store/azure"] gcs = ["object_store/gcp"] s3 = [ @@ -94,6 +95,7 @@ s3 = [ "rusoto_dynamodb/native-tls", "dynamodb_lock/native-tls", "object_store/aws", + "object_store/aws_profile", ] s3-rustls = [ "rusoto_core/rustls", @@ -102,9 +104,11 @@ s3-rustls = [ "rusoto_dynamodb/rustls", "dynamodb_lock/rustls", "object_store/aws", + "object_store/aws_profile", ] glue = ["s3", "rusoto_glue"] python = ["arrow/pyarrow"] + # used only for integration testing integration_test = ["fs_extra", "tempdir"] @@ -114,4 +118,4 @@ harness = false [[example]] name = "basic_operations" -required-features = ["datafusion-ext"] +required-features = ["datafusion"] diff --git a/rust/README.md b/rust/README.md index b20bfda3f8..e7e0ff5dcd 100644 --- a/rust/README.md +++ b/rust/README.md @@ -1,14 +1,11 @@ -Deltalake -========= +# Deltalake [![crates.io](https://img.shields.io/crates/v/deltalake.svg?style=flat-square)](https://crates.io/crates/deltalake) [![api_doc](https://img.shields.io/badge/doc-api-blue)](https://docs.rs/deltalake) Native Delta Lake implementation in Rust - -Usage ------ +## Usage ### API @@ -17,7 +14,6 @@ let table = deltalake::open_table("./tests/data/simple_table").await.unwrap(); println!("{}", table.get_files()); ``` - ### CLI ```bash @@ -43,20 +39,18 @@ Examples can be run using the `cargo run --example` command. For example: cargo run --example read_delta_table ``` -Optional cargo package features ------------------------ +## Optional cargo package features - `s3` - enable the S3 storage backend to work with Delta Tables in AWS S3. - `s3-rustls` - enable the S3 storage backend but rely on [rustls](https://github.com/ctz/rustls) rather than OpenSSL (`native-tls`). - `glue` - enable the Glue data catalog to work with Delta Tables with AWS Glue. - `azure` - enable the Azure storage backend to work with Delta Tables in Azure Data Lake Storage Gen2 accounts. - `gcs` - enable the Google storage backend to work with Delta Tables in Google Cloud Storage. -- `datafusion-ext` - enable the `datafusion::datasource::TableProvider` trait implementation for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion). +- `datafusion` - enable the `datafusion::datasource::TableProvider` trait implementation for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion). +- `datafusion-ext` - DEPRECATED: alias for `datafusion` feature - `parquet2` - use parquet2 for checkpoint deserialization. Since `arrow` and `parquet` features are enabled by default for backwards compatibility, this feature needs to be used with `--no-default-features`. - -Development ------------ +## Development To run s3 integration tests from local machine, we use docker-compose to stand up AWS local stack. To spin up the test environment run `docker-compose up` in diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index 3e7b62fffb..8bc09ba8ed 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -233,6 +233,7 @@ impl Add { } /// Get whatever stats are available. Uses (parquet struct) parsed_stats if present falling back to json stats. + #[cfg(any(feature = "parquet", feature = "parquet2"))] pub fn get_stats(&self) -> Result, serde_json::error::Error> { match self.get_stats_parsed() { Ok(Some(stats)) => Ok(Some(stats)), @@ -247,6 +248,12 @@ impl Add { } } + /// Get whatever stats are available. + #[cfg(not(any(feature = "parquet", feature = "parquet2")))] + pub fn get_stats(&self) -> Result, serde_json::error::Error> { + self.get_json_stats() + } + /// Returns the serde_json representation of stats contained in the action if present. /// Since stats are defined as optional in the protocol, this may be None. pub fn get_json_stats(&self) -> Result, serde_json::error::Error> { diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 5a12496b66..478656bc3d 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -92,7 +92,7 @@ pub enum DeltaTableError { }, /// Error returned when parsing checkpoint parquet. - // #[cfg(feature = "parquet")] + #[cfg(any(feature = "parquet", feature = "parquet2"))] #[error("Failed to parse parquet: {}", .source)] Parquet { /// Parquet error details returned when reading the checkpoint failed. @@ -676,6 +676,7 @@ impl DeltaTable { Ok(()) } + #[cfg(any(feature = "parquet", feature = "parquet2"))] async fn restore_checkpoint(&mut self, check_point: CheckPoint) -> Result<(), DeltaTableError> { self.state = DeltaTableState::from_checkpoint(self, &check_point).await?; @@ -787,6 +788,7 @@ impl DeltaTable { /// Updates the DeltaTable to the most recent state committed to the transaction log by /// loading the last checkpoint and incrementally applying each version since. + #[cfg(any(feature = "parquet", feature = "parquet2"))] pub async fn update(&mut self) -> Result<(), DeltaTableError> { match self.get_last_checkpoint().await { Ok(last_check_point) => { @@ -803,6 +805,12 @@ impl DeltaTable { } } + /// Updates the DeltaTable to the most recent state committed to the transaction log. + #[cfg(not(any(feature = "parquet", feature = "parquet2")))] + pub async fn update(&mut self) -> Result<(), DeltaTableError> { + self.update_incremental().await + } + /// Updates the DeltaTable to the latest version by incrementally applying newer versions. /// It assumes that the table is already updated to the current version `self.version`. pub async fn update_incremental(&mut self) -> Result<(), DeltaTableError> { @@ -838,8 +846,9 @@ impl DeltaTable { } } - let mut next_version; + let mut next_version = 0; // 1. find latest checkpoint below version + #[cfg(any(feature = "parquet", feature = "parquet2"))] match self.find_latest_check_point_for_version(version).await? { Some(check_point) => { self.restore_checkpoint(check_point).await?; @@ -848,7 +857,6 @@ impl DeltaTable { None => { // no checkpoint found, clear table state and start from the beginning self.state = DeltaTableState::with_version(0); - next_version = 0; } } diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 2d59daa10a..8c99bd4ea2 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -41,8 +41,11 @@ //! or Azure Blob Storage / Azure Data Lake Storage Gen2 (ADLS2). Use `s3-rustls` to use Rust TLS //! instead of native TLS implementation. //! - `glue` - enable the Glue data catalog to work with Delta Tables with AWS Glue. -//! - `datafusion-ext` - enable the `datafusion::datasource::TableProvider` trait implementation +//! - `datafusion` - enable the `datafusion::datasource::TableProvider` trait implementation //! for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion). +//! - `datafusion-ext` - DEPRECATED: alias for `datafusion` feature. +//! - `parquet2` - use parquet2 for checkpoint deserialization. Since `arrow` and `parquet` features +//! are enabled by default for backwards compatibility, this feature needs to be used with `--no-default-features`. //! //! # Querying Delta Tables with Datafusion //! @@ -64,23 +67,18 @@ //! .await.unwrap(); //! }; //! ``` -//! -//! It's important to note that the DataFusion library is evolving quickly, often with breaking api -//! changes, and this may cause compilation issues as a result. If you are having issues with the most -//! recently released `delta-rs` you can set a specific branch or commit in your `Cargo.toml`. -//! -//! ```toml -//! datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "07bc2c754805f536fe1cd873dbe6adfc0a21cbb3" } -//! ``` #![deny(warnings)] #![deny(missing_docs)] #[cfg(all(feature = "parquet", feature = "parquet2"))] compile_error!( - "Feature parquet and parquet2 are mutually exclusive and cannot be enabled together" + "Features parquet and parquet2 are mutually exclusive and cannot be enabled together" ); +#[cfg(all(feature = "s3", feature = "s3-rustls"))] +compile_error!("Features s3 and s3-rustls are mutually exclusive and cannot be enabled together"); + pub mod action; pub mod builder; pub mod data_catalog; @@ -98,11 +96,11 @@ pub mod vacuum; pub mod checkpoints; #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod delta_arrow; -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] pub mod delta_datafusion; #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod operations; -#[cfg(feature = "parquet")] +#[cfg(all(feature = "arrow", feature = "parquet"))] pub mod optimize; #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod writer; @@ -117,7 +115,7 @@ pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, Object // convenience exports for consumers to avoid aligning crate versions #[cfg(feature = "arrow")] pub use arrow; -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] pub use datafusion; #[cfg(all(feature = "arrow", feature = "parquet"))] pub use operations::DeltaOps; diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index db9ae818a1..fb0d79198e 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -14,20 +14,20 @@ use crate::{DeltaResult, DeltaTable, DeltaTableError}; pub mod create; pub mod transaction; -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] use self::{load::LoadBuilder, write::WriteBuilder}; -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] use arrow::record_batch::RecordBatch; -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] pub use datafusion::physical_plan::common::collect as collect_sendable_stream; -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] mod load; -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] pub mod write; // TODO the writer module does not actually depend on datafusion, // eventually we should consolidate with the record batch writer -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] mod writer; /// Maximum supported writer version @@ -93,14 +93,14 @@ impl DeltaOps { } /// Load data from a DeltaTable - #[cfg(feature = "datafusion-ext")] + #[cfg(feature = "datafusion")] #[must_use] pub fn load(self) -> LoadBuilder { LoadBuilder::default().with_object_store(self.0.object_store()) } /// Write data to Delta table - #[cfg(feature = "datafusion-ext")] + #[cfg(feature = "datafusion")] #[must_use] pub fn write(self, batches: impl IntoIterator) -> WriteBuilder { WriteBuilder::default() diff --git a/rust/src/operations/write.rs b/rust/src/operations/write.rs index 9ce9bf5b0a..15ca82d6fd 100644 --- a/rust/src/operations/write.rs +++ b/rust/src/operations/write.rs @@ -285,6 +285,9 @@ impl std::future::IntoFuture for WriteBuilder { let schema = batches[0].schema(); if let Ok(meta) = table.get_metadata() { + // NOTE the schema generated from the delta schema will have the delta field metadata included, + // so we need to compare the field names and datatypes instead. + // TODO update comparison logic, once we have column mappings supported. let curr_schema: ArrowSchemaRef = Arc::new((&meta.schema).try_into()?); if !schema_eq(curr_schema, schema.clone()) { diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index 9aee1203ee..c7531fc880 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use tokio::io::AsyncWrite; use crate::get_storage_backend; -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] use datafusion::datasource::object_store::ObjectStoreUrl; use serde::de::Error; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -143,7 +143,7 @@ impl DeltaObjectStore { self.config.to_uri(&Path::from("")) } - #[cfg(feature = "datafusion-ext")] + #[cfg(feature = "datafusion")] /// generate a unique enough url to identify the store in datafusion. pub(crate) fn object_store_url(&self) -> ObjectStoreUrl { // we are certain, that the URL can be parsed, since diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 0d39926655..7dcc776099 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -1,8 +1,7 @@ //! The module for delta table state. use super::{ - ApplyLogError, CheckPoint, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable, - DeltaTableConfig, DeltaTableError, DeltaTableMetaData, + ApplyLogError, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable, DeltaTableMetaData, }; use crate::action::{self, Action}; use crate::delta_config; @@ -15,6 +14,9 @@ use std::collections::HashSet; use std::convert::TryFrom; use std::io::{BufRead, BufReader, Cursor}; +#[cfg(any(feature = "parquet", feature = "parquet2"))] +use super::{CheckPoint, DeltaTableConfig, DeltaTableError}; + /// State snapshot currently held by the Delta Table instance. #[derive(Default, Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -83,6 +85,7 @@ impl DeltaTableState { } /// Update DeltaTableState with checkpoint data. + #[cfg(any(feature = "parquet", feature = "parquet2"))] pub fn process_checkpoint_bytes( &mut self, data: bytes::Bytes, @@ -133,6 +136,7 @@ impl DeltaTableState { } /// Construct a delta table state object from checkpoint. + #[cfg(any(feature = "parquet", feature = "parquet2"))] pub async fn from_checkpoint( table: &DeltaTable, check_point: &CheckPoint, diff --git a/rust/src/time_utils.rs b/rust/src/time_utils.rs index f5ef17bc1a..407185b927 100644 --- a/rust/src/time_utils.rs +++ b/rust/src/time_utils.rs @@ -1,8 +1,9 @@ //! Utility functions for converting time formats. +#![allow(unused)] #[cfg(feature = "arrow")] use arrow::temporal_conversions; -#[cfg(not(feature = "parquet2"))] +#[cfg(feature = "parquet")] use parquet::basic::TimeUnit; #[cfg(feature = "parquet2")] use parquet2::schema::types::TimeUnit; @@ -84,7 +85,7 @@ pub fn timestamp_micros_from_stats_string(s: &str) -> Result Option { let dt = match time_unit { TimeUnit::MILLIS(_) => temporal_conversions::timestamp_ms_to_datetime(n), diff --git a/rust/tests/common/mod.rs b/rust/tests/common/mod.rs index 8710c97858..37118d4ad5 100644 --- a/rust/tests/common/mod.rs +++ b/rust/tests/common/mod.rs @@ -16,7 +16,7 @@ use tempdir::TempDir; #[cfg(feature = "azure")] pub mod adls; pub mod clock; -#[cfg(feature = "datafusion-ext")] +#[cfg(feature = "datafusion")] pub mod datafusion; #[cfg(any(feature = "s3", feature = "s3-rustls"))] pub mod s3; diff --git a/rust/tests/datafusion_test.rs b/rust/tests/datafusion_test.rs index 293acbc28b..7e1534ceec 100644 --- a/rust/tests/datafusion_test.rs +++ b/rust/tests/datafusion_test.rs @@ -1,4 +1,4 @@ -#![cfg(feature = "datafusion-ext")] +#![cfg(feature = "datafusion")] use std::collections::HashSet; use std::path::PathBuf; diff --git a/rust/tests/integration_datafusion.rs b/rust/tests/integration_datafusion.rs index 7633f4f4b8..9fb9e92125 100644 --- a/rust/tests/integration_datafusion.rs +++ b/rust/tests/integration_datafusion.rs @@ -1,4 +1,4 @@ -#![cfg(all(feature = "integration_test", feature = "datafusion-ext"))] +#![cfg(all(feature = "integration_test", feature = "datafusion"))] use arrow::array::Int64Array; use common::datafusion::context_with_delta_table_factory;