Skip to content

Commit

Permalink
feat: clean up dependencies and feature flags (delta-io#1014)
Browse files Browse the repository at this point in the history
# Description

~~This PR updates datafusion and related dependencies to their latest
versions. Since datafusion now has improved support for loading
partition columns with non string types, we update our scan methods to
take advantage of that.~~

While working on dependencies, I took the opportunity to do some
housekeeping.

- do not use chrono with default features
- make `aws-profile` from object_store optional. The upstream create
explicitly discourages its usage, and it brings quite a few new
dependencies, as it pulls in some aws sdk.
- rename `datafusion-ext` feature to `datafusion`. The ext suffix is
still from a time where there were less options to define features. I
kept the ols feature around as an alias.

# Related Issue(s)

closes delta-io#914

# Documentation

<!---
Share links to useful documentation
--->

Co-authored-by: R. Tyler Croy <rtyler@brokenco.de>
  • Loading branch information
2 people authored and chitralverma committed Mar 17, 2023
1 parent 246bfc7 commit 4204ace
Show file tree
Hide file tree
Showing 16 changed files with 79 additions and 57 deletions.
12 changes: 7 additions & 5 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ jobs:
override: true
- uses: Swatinem/rust-cache@v1
- name: build and lint with clippy
run: cargo clippy --features azure,datafusion-ext,s3,gcs,glue
run: cargo clippy --features azure,datafusion,s3,gcs,glue
- name: Spot-check build for rustls features
run: cargo clippy --features s3-rustls
- name: Check docs
run: cargo doc --features azure,datafusion-ext,s3,gcs,glue
run: cargo doc --features azure,datafusion,s3,gcs,glue
- name: Check no default features
run: cargo check --no-default-features

test:
strategy:
Expand All @@ -68,7 +70,7 @@ jobs:
override: true
- uses: Swatinem/rust-cache@v1
- name: Run tests
run: cargo test --verbose --features datafusion-ext,azure
run: cargo test --verbose --features datafusion,azure

integration_test:
name: Integration Tests
Expand Down Expand Up @@ -107,10 +109,10 @@ jobs:

- name: Run tests with default ssl
run: |
cargo test --features integration_test,azure,s3,gcs,datafusion-ext
cargo test --features integration_test,azure,s3,gcs,datafusion
- name: Run tests with rustls
run: |
cargo test --features integration_test,s3-rustls,datafusion-ext
cargo test --features integration_test,s3-rustls,datafusion
parquet2_test:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ features = ["extension-module", "abi3", "abi3-py37"]
[dependencies.deltalake]
path = "../rust"
version = "0"
features = ["s3", "azure", "glue", "gcs", "python", "datafusion-ext"]
features = ["s3", "azure", "glue", "gcs", "python", "datafusion"]
3 changes: 2 additions & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ devel = [
"pytest-timeout",
"sphinx<=4.5",
"sphinx-rtd-theme",
"toml"
"toml",
"wheel"
]
pyspark = [
"pyspark",
Expand Down
14 changes: 9 additions & 5 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ edition = "2021"
arrow = { version = "28", optional = true }
async-trait = "0.1"
bytes = "1"
chrono = "0.4.22"
chrono = { version = "0.4.22", default-features = false, features = ["clock"] }
cfg-if = "1"
errno = "0.2"
futures = "0.3"
Expand All @@ -22,7 +22,7 @@ log = "0"
libc = ">=0.2.90, <1"
num-bigint = "0.4"
num-traits = "0.2.15"
object_store = { version = "0.5.2", features = ["aws_profile"] }
object_store = "0.5.2"
once_cell = "1.16.0"
parking_lot = "0.12"
parquet = { version = "28", features = ["async"], optional = true }
Expand Down Expand Up @@ -77,14 +77,15 @@ glibc_version = { path = "../glibc_version", version = "0.1" }

[features]
default = ["arrow", "parquet"]
datafusion-ext = [
"datafusion",
datafusion = [
"dep:datafusion",
"datafusion-expr",
"datafusion-common",
"datafusion-proto",
"arrow",
"parquet",
]
datafusion-ext = ["datafusion"]
azure = ["object_store/azure"]
gcs = ["object_store/gcp"]
s3 = [
Expand All @@ -94,6 +95,7 @@ s3 = [
"rusoto_dynamodb/native-tls",
"dynamodb_lock/native-tls",
"object_store/aws",
"object_store/aws_profile",
]
s3-rustls = [
"rusoto_core/rustls",
Expand All @@ -102,9 +104,11 @@ s3-rustls = [
"rusoto_dynamodb/rustls",
"dynamodb_lock/rustls",
"object_store/aws",
"object_store/aws_profile",
]
glue = ["s3", "rusoto_glue"]
python = ["arrow/pyarrow"]

# used only for integration testing
integration_test = ["fs_extra", "tempdir"]

Expand All @@ -114,4 +118,4 @@ harness = false

[[example]]
name = "basic_operations"
required-features = ["datafusion-ext"]
required-features = ["datafusion"]
18 changes: 6 additions & 12 deletions rust/README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
Deltalake
=========
# Deltalake

[![crates.io](https://img.shields.io/crates/v/deltalake.svg?style=flat-square)](https://crates.io/crates/deltalake)
[![api_doc](https://img.shields.io/badge/doc-api-blue)](https://docs.rs/deltalake)

Native Delta Lake implementation in Rust


Usage
-----
## Usage

### API

Expand All @@ -17,7 +14,6 @@ let table = deltalake::open_table("./tests/data/simple_table").await.unwrap();
println!("{}", table.get_files());
```


### CLI

```bash
Expand All @@ -43,20 +39,18 @@ Examples can be run using the `cargo run --example` command. For example:
cargo run --example read_delta_table
```

Optional cargo package features
-----------------------
## Optional cargo package features

- `s3` - enable the S3 storage backend to work with Delta Tables in AWS S3.
- `s3-rustls` - enable the S3 storage backend but rely on [rustls](https://github.com/ctz/rustls) rather than OpenSSL (`native-tls`).
- `glue` - enable the Glue data catalog to work with Delta Tables with AWS Glue.
- `azure` - enable the Azure storage backend to work with Delta Tables in Azure Data Lake Storage Gen2 accounts.
- `gcs` - enable the Google storage backend to work with Delta Tables in Google Cloud Storage.
- `datafusion-ext` - enable the `datafusion::datasource::TableProvider` trait implementation for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion).
- `datafusion` - enable the `datafusion::datasource::TableProvider` trait implementation for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion).
- `datafusion-ext` - DEPRECATED: alias for `datafusion` feature
- `parquet2` - use parquet2 for checkpoint deserialization. Since `arrow` and `parquet` features are enabled by default for backwards compatibility, this feature needs to be used with `--no-default-features`.


Development
-----------
## Development

To run s3 integration tests from local machine, we use docker-compose to stand
up AWS local stack. To spin up the test environment run `docker-compose up` in
Expand Down
7 changes: 7 additions & 0 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ impl Add {
}

/// Get whatever stats are available. Uses (parquet struct) parsed_stats if present falling back to json stats.
#[cfg(any(feature = "parquet", feature = "parquet2"))]
pub fn get_stats(&self) -> Result<Option<Stats>, serde_json::error::Error> {
match self.get_stats_parsed() {
Ok(Some(stats)) => Ok(Some(stats)),
Expand All @@ -247,6 +248,12 @@ impl Add {
}
}

/// Get whatever stats are available.
#[cfg(not(any(feature = "parquet", feature = "parquet2")))]
pub fn get_stats(&self) -> Result<Option<Stats>, serde_json::error::Error> {
self.get_json_stats()
}

/// Returns the serde_json representation of stats contained in the action if present.
/// Since stats are defined as optional in the protocol, this may be None.
pub fn get_json_stats(&self) -> Result<Option<Stats>, serde_json::error::Error> {
Expand Down
14 changes: 11 additions & 3 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub enum DeltaTableError {
},

/// Error returned when parsing checkpoint parquet.
// #[cfg(feature = "parquet")]
#[cfg(any(feature = "parquet", feature = "parquet2"))]
#[error("Failed to parse parquet: {}", .source)]
Parquet {
/// Parquet error details returned when reading the checkpoint failed.
Expand Down Expand Up @@ -676,6 +676,7 @@ impl DeltaTable {
Ok(())
}

#[cfg(any(feature = "parquet", feature = "parquet2"))]
async fn restore_checkpoint(&mut self, check_point: CheckPoint) -> Result<(), DeltaTableError> {
self.state = DeltaTableState::from_checkpoint(self, &check_point).await?;

Expand Down Expand Up @@ -787,6 +788,7 @@ impl DeltaTable {

/// Updates the DeltaTable to the most recent state committed to the transaction log by
/// loading the last checkpoint and incrementally applying each version since.
#[cfg(any(feature = "parquet", feature = "parquet2"))]
pub async fn update(&mut self) -> Result<(), DeltaTableError> {
match self.get_last_checkpoint().await {
Ok(last_check_point) => {
Expand All @@ -803,6 +805,12 @@ impl DeltaTable {
}
}

/// Updates the DeltaTable to the most recent state committed to the transaction log.
#[cfg(not(any(feature = "parquet", feature = "parquet2")))]
pub async fn update(&mut self) -> Result<(), DeltaTableError> {
self.update_incremental().await
}

/// Updates the DeltaTable to the latest version by incrementally applying newer versions.
/// It assumes that the table is already updated to the current version `self.version`.
pub async fn update_incremental(&mut self) -> Result<(), DeltaTableError> {
Expand Down Expand Up @@ -838,8 +846,9 @@ impl DeltaTable {
}
}

let mut next_version;
let mut next_version = 0;
// 1. find latest checkpoint below version
#[cfg(any(feature = "parquet", feature = "parquet2"))]
match self.find_latest_check_point_for_version(version).await? {
Some(check_point) => {
self.restore_checkpoint(check_point).await?;
Expand All @@ -848,7 +857,6 @@ impl DeltaTable {
None => {
// no checkpoint found, clear table state and start from the beginning
self.state = DeltaTableState::with_version(0);
next_version = 0;
}
}

Expand Down
24 changes: 11 additions & 13 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@
//! or Azure Blob Storage / Azure Data Lake Storage Gen2 (ADLS2). Use `s3-rustls` to use Rust TLS
//! instead of native TLS implementation.
//! - `glue` - enable the Glue data catalog to work with Delta Tables with AWS Glue.
//! - `datafusion-ext` - enable the `datafusion::datasource::TableProvider` trait implementation
//! - `datafusion` - enable the `datafusion::datasource::TableProvider` trait implementation
//! for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion).
//! - `datafusion-ext` - DEPRECATED: alias for `datafusion` feature.
//! - `parquet2` - use parquet2 for checkpoint deserialization. Since `arrow` and `parquet` features
//! are enabled by default for backwards compatibility, this feature needs to be used with `--no-default-features`.
//!
//! # Querying Delta Tables with Datafusion
//!
Expand All @@ -64,23 +67,18 @@
//! .await.unwrap();
//! };
//! ```
//!
//! It's important to note that the DataFusion library is evolving quickly, often with breaking api
//! changes, and this may cause compilation issues as a result. If you are having issues with the most
//! recently released `delta-rs` you can set a specific branch or commit in your `Cargo.toml`.
//!
//! ```toml
//! datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "07bc2c754805f536fe1cd873dbe6adfc0a21cbb3" }
//! ```

#![deny(warnings)]
#![deny(missing_docs)]

#[cfg(all(feature = "parquet", feature = "parquet2"))]
compile_error!(
"Feature parquet and parquet2 are mutually exclusive and cannot be enabled together"
"Features parquet and parquet2 are mutually exclusive and cannot be enabled together"
);

#[cfg(all(feature = "s3", feature = "s3-rustls"))]
compile_error!("Features s3 and s3-rustls are mutually exclusive and cannot be enabled together");

pub mod action;
pub mod builder;
pub mod data_catalog;
Expand All @@ -98,11 +96,11 @@ pub mod vacuum;
pub mod checkpoints;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod delta_arrow;
#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
pub mod delta_datafusion;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod operations;
#[cfg(feature = "parquet")]
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod optimize;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod writer;
Expand All @@ -117,7 +115,7 @@ pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, Object
// convenience exports for consumers to avoid aligning crate versions
#[cfg(feature = "arrow")]
pub use arrow;
#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
pub use datafusion;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub use operations::DeltaOps;
Expand Down
16 changes: 8 additions & 8 deletions rust/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@ use crate::{DeltaResult, DeltaTable, DeltaTableError};
pub mod create;
pub mod transaction;

#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
use self::{load::LoadBuilder, write::WriteBuilder};
#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
use arrow::record_batch::RecordBatch;
#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
pub use datafusion::physical_plan::common::collect as collect_sendable_stream;

#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
mod load;
#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
pub mod write;
// TODO the writer module does not actually depend on datafusion,
// eventually we should consolidate with the record batch writer
#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
mod writer;

/// Maximum supported writer version
Expand Down Expand Up @@ -93,14 +93,14 @@ impl DeltaOps {
}

/// Load data from a DeltaTable
#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
#[must_use]
pub fn load(self) -> LoadBuilder {
LoadBuilder::default().with_object_store(self.0.object_store())
}

/// Write data to Delta table
#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
#[must_use]
pub fn write(self, batches: impl IntoIterator<Item = RecordBatch>) -> WriteBuilder {
WriteBuilder::default()
Expand Down
3 changes: 3 additions & 0 deletions rust/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ impl std::future::IntoFuture for WriteBuilder {
let schema = batches[0].schema();

if let Ok(meta) = table.get_metadata() {
// NOTE the schema generated from the delta schema will have the delta field metadata included,
// so we need to compare the field names and datatypes instead.
// TODO update comparison logic, once we have column mappings supported.
let curr_schema: ArrowSchemaRef = Arc::new((&meta.schema).try_into()?);

if !schema_eq(curr_schema, schema.clone()) {
Expand Down
4 changes: 2 additions & 2 deletions rust/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use tokio::io::AsyncWrite;

use crate::get_storage_backend;
#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
use datafusion::datasource::object_store::ObjectStoreUrl;
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
Expand Down Expand Up @@ -143,7 +143,7 @@ impl DeltaObjectStore {
self.config.to_uri(&Path::from(""))
}

#[cfg(feature = "datafusion-ext")]
#[cfg(feature = "datafusion")]
/// generate a unique enough url to identify the store in datafusion.
pub(crate) fn object_store_url(&self) -> ObjectStoreUrl {
// we are certain, that the URL can be parsed, since
Expand Down
Loading

0 comments on commit 4204ace

Please sign in to comment.