From 2fccbdb849d8b1cfc243e9b0e974456bdd0c0906 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 26 Jul 2023 10:19:41 -0700 Subject: [PATCH 01/16] Start python api docs --- docs/python_api.md | 33 ++++++++++++ docs/requirements.txt | 3 ++ docs/usage/examining-table.md | 23 ++++----- docs/usage/index.md | 2 +- docs/usage/loading-table.md | 14 ++---- mkdocs.yml | 24 ++++++++- python/deltalake/_internal.pyi | 92 ++++++++++------------------------ python/src/filesystem.rs | 6 +-- python/src/lib.rs | 4 +- python/src/schema.rs | 12 ++--- 10 files changed, 112 insertions(+), 101 deletions(-) create mode 100644 docs/python_api.md create mode 100644 docs/requirements.txt diff --git a/docs/python_api.md b/docs/python_api.md new file mode 100644 index 0000000000..4837122c43 --- /dev/null +++ b/docs/python_api.md @@ -0,0 +1,33 @@ +# Python API Reference + +## DeltaTable + +::: deltalake.table + +## Writing Delta Tables + +::: deltalake.write_deltalake + +## Delta Lake Schemas + +Schemas, fields, and data types are provided in the ``deltalake.schema`` submodule. + +::: deltalake.schema.Schema + +::: deltalake.schema.PrimitiveType + +::: deltalake.schema.ArrayType + +::: deltalake.schema.MapType + +::: deltalake.schema.Field + +::: deltalake.schema.StructType + +## Data Catalog + +::: deltalake.data_catalog + +## Delta Storage Handler + +::: deltalake.fs diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 0000000000..608b198e25 --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,3 @@ +mkdocs +mkdocstrings[python] +mkdocs-autorefs \ No newline at end of file diff --git a/docs/usage/examining-table.md b/docs/usage/examining-table.md index 5641a926b3..c4cbfb0836 100644 --- a/docs/usage/examining-table.md +++ b/docs/usage/examining-table.md @@ -14,7 +14,7 @@ The delta log maintains basic metadata about a table, including: to have data deleted from it. Get metadata from a table with the -`DeltaTable.metadata` method: +[DeltaTable.metadata()][] method: ``` python >>> from deltalake import DeltaTable @@ -27,12 +27,12 @@ Metadata(id: 5fba94ed-9794-4965-ba6e-6ee3c0d22af9, name: None, description: None The schema for the table is also saved in the transaction log. It can either be retrieved in the Delta Lake form as -`deltalake.schema.Schema` or as a +[deltalake.schema.Schema][] or as a PyArrow schema. The first allows you to introspect any column-level metadata stored in the schema, while the latter represents the schema the table will be loaded into. -Use `DeltaTable.schema` to retrieve the delta lake schema: +Use [DeltaTable.schema][] to retrieve the delta lake schema: ``` python >>> from deltalake import DeltaTable @@ -43,14 +43,14 @@ Schema([Field(id, PrimitiveType("long"), nullable=True)]) These schemas have a JSON representation that can be retrieved. To reconstruct from json, use -`deltalake.schema.Schema.from_json()`. +[deltalake.schema.Schema.from_json()][]. ``` python >>> dt.schema().json() '{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}}]}' ``` -Use `deltalake.schema.Schema.to_pyarrow()` to retrieve the PyArrow schema: +Use [deltalake.schema.Schema.to_pyarrow()][] to retrieve the PyArrow schema: ``` python >>> dt.schema().to_pyarrow() @@ -65,15 +65,12 @@ table, when, and by whom. This information is retained for 30 days by default, unless otherwise specified by the table configuration `delta.logRetentionDuration`. -::: note -::: title -Note -::: +!!! note + + This information is not written by all writers and different writers may + use different schemas to encode the actions. For Spark\'s format, see: + -This information is not written by all writers and different writers may -use different schemas to encode the actions. For Spark\'s format, see: - -::: To view the available history, use `DeltaTable.history`: diff --git a/docs/usage/index.md b/docs/usage/index.md index c765f7b5df..5f9624653a 100644 --- a/docs/usage/index.md +++ b/docs/usage/index.md @@ -1,6 +1,6 @@ # Usage -A `DeltaTable` represents the state of a +A [DeltaTable][] represents the state of a delta table at a particular version. This includes which files are currently part of the table, the schema of the table, and other metadata such as creation time. diff --git a/docs/usage/loading-table.md b/docs/usage/loading-table.md index 26d78a593f..1af7df95e5 100644 --- a/docs/usage/loading-table.md +++ b/docs/usage/loading-table.md @@ -109,12 +109,8 @@ version number or datetime string: >>> dt.load_with_datetime("2021-11-04 00:05:23.283+00:00") ``` -::: warning -::: title -Warning -::: - -Previous table versions may not exist if they have been vacuumed, in -which case an exception will be thrown. See [Vacuuming -tables](#vacuuming-tables) for more information. -::: \ No newline at end of file +!!! warning + + Previous table versions may not exist if they have been vacuumed, in + which case an exception will be thrown. See [Vacuuming + tables](#vacuuming-tables) for more information. diff --git a/mkdocs.yml b/mkdocs.yml index b6cf710863..dd25578b4e 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -10,4 +10,26 @@ nav: - Examining a Delta Table: usage/examining-table.md - Querying a Delta Table: usage/querying-delta-tables.md - Managing a Delta Table: usage/managing-tables.md - - Writing Delta Tables: usage/writing-delta-tables.md \ No newline at end of file + - Writing Delta Tables: usage/writing-delta-tables.md + - API Reference: python_api.md + +plugins: +- autorefs +- mkdocstrings: + handlers: + python: + path: [../python] + rendering: + heading_level: 4 + show_source: false + show_symbol_type_in_heading: true + show_signature_annotations: true + show_root_heading: true + members_order: source + import: + # for cross references + - https://arrow.apache.org/docs/objects.inv + - https://pandas.pydata.org/docs/objects.inv + +markdown_extensions: + - admonition \ No newline at end of file diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 48da1d47df..5ba3b2b077 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -13,23 +13,22 @@ from deltalake.writer import AddAction __version__: str -RawDeltaTable: Any -rust_core_version: Callable[[], str] - -write_new_deltalake: Callable[ - [ - str, - pa.Schema, - List[AddAction], - str, - List[str], - Optional[str], - Optional[str], - Optional[Mapping[str, Optional[str]]], - Optional[Dict[str, str]], - ], - None, -] +class RawDeltaTable: + ... + +def rust_core_version() -> str: ... + +def write_new_deltalake( + table_uri: str, + schema: pa.Schema, + add_actions: List[AddAction], + _mode: str, + partition_by: List[str], + name: Optional[str], + description: Optional[str], + configuration: Optional[Mapping[str, Optional[str]]], + storage_options: Optional[Dict[str, str]], +): ... def batch_distinct(batch: pa.RecordBatch) -> pa.RecordBatch: ... @@ -93,34 +92,21 @@ class Field: *, nullable: bool = True, metadata: Optional[Dict[str, Any]] = None, - ) -> None: - """A named field, with a data type, nullability, and optional metadata.""" + ) -> None: ... name: str - """The field name.""" type: DataType - """The field data type.""" nullable: bool - """The field nullability.""" metadata: Dict[str, Any] - """The field metadata.""" - def to_json(self) -> str: - """Get the JSON representation of the Field. + def to_json(self) -> str: ... - :rtype: str - """ @staticmethod - def from_json(json: str) -> "Field": - """Create a new Field from a JSON string. + def from_json(json: str) -> "Field": ... + + def to_pyarrow(self) -> pa.Field: ... - :param json: A json string representing the Field. - :rtype: Field - """ - def to_pyarrow(self) -> pa.Field: - """Convert field to a pyarrow.Field.""" @staticmethod - def from_pyarrow(type: pa.Field) -> "Field": - """Create a new field from pyarrow.Field.""" + def from_pyarrow(type: pa.Field) -> "Field": ... class StructType: def __init__(self, fields: List[Field]) -> None: ... @@ -138,41 +124,15 @@ class Schema: def __init__(self, fields: List[Field]) -> None: ... fields: List[Field] invariants: List[Tuple[str, str]] - """The list of invariants defined on the table. - - The first string in each tuple is the field path, the second is the SQL of the invariant. - """ - def to_json(self) -> str: - """Get the JSON representation of the schema. + def to_json(self) -> str: ... - :rtype: str - """ @staticmethod - def from_json(json: str) -> "Schema": - """Create a new Schema from a JSON string. - - :param schema_json: a JSON string - :rtype: Schema - """ - def to_pyarrow(self, as_large_types: bool = False) -> pa.Schema: - """Return equivalent PyArrow schema. - - Note: this conversion is lossy as the Invariants are not stored in pyarrow.Schema. + def from_json(json: str) -> "Schema": ... + def to_pyarrow(self, as_large_types: bool = False) -> pa.Schema: ... - :param as_large_types: get schema with all variable size types (list, - binary, string) as large variants (with int64 indices). This is for - compatibility with systems like Polars that only support the large - versions of Arrow types. - :rtype: pyarrow.Schema - """ @staticmethod - def from_pyarrow(type: pa.Schema) -> "Schema": - """Create a new Schema from a pyarrow.Schema. - - :param data_type: a PyArrow schema - :rtype: Schema - """ + def from_pyarrow(type: pa.Schema) -> "Schema": ... class ObjectInputFile: @property diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 176045cdcc..d7387ce099 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -19,7 +19,7 @@ pub(crate) struct FsConfig { pub(crate) options: HashMap, } -#[pyclass(subclass)] +#[pyclass(subclass, module = "deltalake._internal")] #[derive(Debug, Clone)] pub struct DeltaFileSystemHandler { pub(crate) inner: Arc, @@ -265,7 +265,7 @@ impl DeltaFileSystemHandler { // TODO the C++ implementation track an internal lock on all random access files, DO we need this here? // TODO add buffer to store data ... -#[pyclass(weakref)] +#[pyclass(weakref, module = "deltalake._internal")] #[derive(Debug, Clone)] pub struct ObjectInputFile { store: Arc, @@ -433,7 +433,7 @@ impl ObjectInputFile { // TODO the C++ implementation track an internal lock on all random access files, DO we need this here? // TODO add buffer to store data ... -#[pyclass(weakref)] +#[pyclass(weakref, module = "deltalake._internal")] pub struct ObjectOutputStream { store: Arc, rt: Arc, diff --git a/python/src/lib.rs b/python/src/lib.rs index a78978cff1..a5472114ee 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -49,7 +49,7 @@ enum PartitionFilterValue<'a> { Multiple(Vec<&'a str>), } -#[pyclass] +#[pyclass(module = "deltalake._internal")] struct RawDeltaTable { _table: deltalake::DeltaTable, // storing the config additionally on the table helps us make pickling work. @@ -807,7 +807,7 @@ fn write_new_deltalake( Ok(()) } -#[pyclass(name = "DeltaDataChecker")] +#[pyclass(name = "DeltaDataChecker", module = "deltalake._internal")] struct PyDeltaDataChecker { inner: DeltaDataChecker, rt: tokio::runtime::Runtime, diff --git a/python/src/schema.rs b/python/src/schema.rs index 43d386a617..425d775a9e 100644 --- a/python/src/schema.rs +++ b/python/src/schema.rs @@ -113,7 +113,7 @@ fn python_type_to_schema(ob: PyObject, py: Python) -> PyResult { /// * "decimal(, )" /// /// :param data_type: string representation of the data type -#[pyclass(module = "deltalake.schema")] +#[pyclass(module = "deltalake._internal")] #[derive(Clone)] pub struct PrimitiveType { inner_type: String, @@ -247,7 +247,7 @@ impl PrimitiveType { /// ArrayType(PrimitiveType("integer"), contains_null=True) /// >>> ArrayType("integer", contains_null=False) /// ArrayType(PrimitiveType("integer"), contains_null=False) -#[pyclass(module = "deltalake.schema")] +#[pyclass(module = "deltalake._internal")] #[derive(Clone)] pub struct ArrayType { inner_type: SchemaTypeArray, @@ -409,7 +409,7 @@ impl ArrayType { /// MapType(PrimitiveType("integer"), PrimitiveType("string"), value_contains_null=True) /// >>> MapType("integer", "string", value_contains_null=False) /// MapType(PrimitiveType("integer"), PrimitiveType("string"), value_contains_null=False) -#[pyclass(module = "deltalake.schema")] +#[pyclass(module = "deltalake._internal")] #[derive(Clone)] pub struct MapType { inner_type: SchemaTypeMap, @@ -592,7 +592,7 @@ impl MapType { /// /// >>> Field("my_col", "integer", metadata={"custom_metadata": {"test": 2}}) /// Field("my_col", PrimitiveType("integer"), nullable=True, metadata={"custom_metadata": {"test": 2}}) -#[pyclass(module = "deltalake.schema")] +#[pyclass(module = "deltalake._internal")] #[derive(Clone)] pub struct Field { inner: SchemaField, @@ -770,7 +770,7 @@ impl Field { /// /// >>> StructType([Field("x", "integer"), Field("y", "string")]) /// StructType([Field(x, PrimitiveType("integer"), nullable=True), Field(y, PrimitiveType("string"), nullable=True)]) -#[pyclass(subclass, module = "deltalake.schema")] +#[pyclass(subclass, module = "deltalake._internal")] #[derive(Clone)] pub struct StructType { inner_type: SchemaTypeStruct, @@ -943,7 +943,7 @@ pub fn schema_to_pyobject(schema: &Schema, py: Python) -> PyResult { /// >>> import pyarrow as pa /// >>> Schema.from_pyarrow(pa.schema({"x": pa.int32(), "y": pa.string()})) /// Schema([Field(x, PrimitiveType("integer"), nullable=True), Field(y, PrimitiveType("string"), nullable=True)]) -#[pyclass(extends = StructType, name = "Schema", module = "deltalake.schema")] +#[pyclass(extends = StructType, name = "Schema", module = "deltalake._internal")] pub struct PySchema; #[pymethods] From 6cb9c8acef4dda5b2ab1ad59f81eccb1dd7c454a Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Wed, 27 Sep 2023 18:21:37 -0700 Subject: [PATCH 02/16] fix: update the delta-inspect CLI to be build again by Cargo This sort of withered on the vine a bit, this pull request allows it to be built properly again --- Cargo.toml | 4 ++-- delta-inspect/Cargo.toml | 4 ++-- delta-inspect/src/main.rs | 25 +++++++++++++------------ 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3015ce4bdd..a3e783cc76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] -members = ["rust", "python"] -exclude = ["proofs", "delta-inspect"] +members = ["delta-inspect", "rust", "python"] +exclude = ["proofs"] resolver = "2" [profile.release-with-debug] diff --git a/delta-inspect/Cargo.toml b/delta-inspect/Cargo.toml index 371f157837..8b37bd5e9b 100644 --- a/delta-inspect/Cargo.toml +++ b/delta-inspect/Cargo.toml @@ -10,6 +10,7 @@ edition = "2021" [dependencies] anyhow = "1" +chrono = { workspace = true, default-features = false, features = ["clock"] } clap = { version = "3", features = ["color"] } tokio = { version = "1", features = ["fs", "macros", "rt", "io-util"] } env_logger = "0" @@ -19,8 +20,7 @@ path = "../rust" version = "0" features = ["azure", "gcs"] - [features] default = ["rustls"] native-tls = ["deltalake/s3-native-tls", "deltalake/glue-native-tls"] -rustls = ["deltalake/s3", "deltalake/glue"] \ No newline at end of file +rustls = ["deltalake/s3", "deltalake/glue"] diff --git a/delta-inspect/src/main.rs b/delta-inspect/src/main.rs index 98b431c498..86a7b499bd 100644 --- a/delta-inspect/src/main.rs +++ b/delta-inspect/src/main.rs @@ -1,3 +1,4 @@ +use chrono::Duration; use clap::{App, AppSettings, Arg}; #[tokio::main(flavor = "current_thread")] @@ -79,21 +80,21 @@ async fn main() -> anyhow::Result<()> { Some(("vacuum", vacuum_matches)) => { let dry_run = !vacuum_matches.is_present("no_dry_run"); let table_uri = vacuum_matches.value_of("uri").unwrap(); - let mut table = deltalake::open_table(table_uri).await?; - let files = table - .vacuum( - vacuum_matches.value_of("retention_hours").map(|s| { - s.parse::() - .expect("retention hour should be an unsigned integer") - }), - dry_run, - true - ) + let table = deltalake::open_table(table_uri).await?; + let retention = vacuum_matches + .value_of("retention_hours") + .map(|s| s.parse::().unwrap()) + .unwrap(); + let (_table, metrics) = deltalake::operations::DeltaOps(table) + .vacuum() + .with_retention_period(Duration::hours(retention)) + .with_dry_run(dry_run) .await?; + if dry_run { - println!("Files to deleted: {files:#?}"); + println!("Files to deleted: {metrics:#?}"); } else { - println!("Files deleted: {files:#?}"); + println!("Files deleted: {metrics:#?}"); } } _ => unreachable!(), From 7ca17cd05ce7220890f1ed08bd14ddb1f95b1457 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Thu, 28 Sep 2023 17:52:32 -0700 Subject: [PATCH 03/16] chore: bump the version of the Rust crate --- rust/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 8e69efa789..2f31e8b915 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake" -version = "0.16.0" +version = "0.17.0" rust-version = "1.64" authors = ["Qingping Hou "] homepage = "https://github.com/delta-io/delta.rs" From 13e8be18d3576328e6678f0c002597ff55d49784 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Thu, 28 Sep 2023 17:52:55 -0700 Subject: [PATCH 04/16] fix: unify environment variables referenced by Databricks docs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Long-term fix will be for Databricks to release a Rust SDK for Unity 😄 Fixes #1627 --- rust/src/data_catalog/unity/mod.rs | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/rust/src/data_catalog/unity/mod.rs b/rust/src/data_catalog/unity/mod.rs index 3110f07020..e9de725923 100644 --- a/rust/src/data_catalog/unity/mod.rs +++ b/rust/src/data_catalog/unity/mod.rs @@ -84,16 +84,27 @@ pub enum UnityCatalogConfigKey { /// - `unity_workspace_url` /// - `databricks_workspace_url` /// - `workspace_url` + #[deprecated(since = "0.17.0", note = "Please use the DATABRICKS_HOST env variable")] WorkspaceUrl, + /// Host of the Databricks workspace + Host, + /// Access token to authorize API requests /// /// Supported keys: /// - `unity_access_token` /// - `databricks_access_token` /// - `access_token` + #[deprecated( + since = "0.17.0", + note = "Please use the DATABRICKS_TOKEN env variable" + )] AccessToken, + /// Token to use for Databricks Unity + Token, + /// Service principal client id for authorizing requests /// /// Supported keys: @@ -167,6 +178,7 @@ pub enum UnityCatalogConfigKey { impl FromStr for UnityCatalogConfigKey { type Err = DataCatalogError; + #[allow(deprecated)] fn from_str(s: &str) -> Result { match s { "access_token" | "unity_access_token" | "databricks_access_token" => { @@ -187,6 +199,7 @@ impl FromStr for UnityCatalogConfigKey { "federated_token_file" | "unity_federated_token_file" | "databricks_federated_token_file" => Ok(UnityCatalogConfigKey::FederatedTokenFile), + "host" => Ok(UnityCatalogConfigKey::Host), "msi_endpoint" | "unity_msi_endpoint" | "databricks_msi_endpoint" => { Ok(UnityCatalogConfigKey::MsiEndpoint) } @@ -196,6 +209,7 @@ impl FromStr for UnityCatalogConfigKey { "object_id" | "unity_object_id" | "databricks_object_id" => { Ok(UnityCatalogConfigKey::ObjectId) } + "token" => Ok(UnityCatalogConfigKey::Token), "use_azure_cli" | "unity_use_azure_cli" | "databricks_use_azure_cli" => { Ok(UnityCatalogConfigKey::UseAzureCli) } @@ -207,6 +221,7 @@ impl FromStr for UnityCatalogConfigKey { } } +#[allow(deprecated)] impl AsRef for UnityCatalogConfigKey { fn as_ref(&self) -> &str { match self { @@ -216,10 +231,12 @@ impl AsRef for UnityCatalogConfigKey { UnityCatalogConfigKey::ClientId => "unity_client_id", UnityCatalogConfigKey::ClientSecret => "unity_client_secret", UnityCatalogConfigKey::FederatedTokenFile => "unity_federated_token_file", + UnityCatalogConfigKey::Host => "databricks_host", UnityCatalogConfigKey::MsiEndpoint => "unity_msi_endpoint", UnityCatalogConfigKey::MsiResourceId => "unity_msi_resource_id", UnityCatalogConfigKey::ObjectId => "unity_object_id", UnityCatalogConfigKey::UseAzureCli => "unity_use_azure_cli", + UnityCatalogConfigKey::Token => "databricks_token", UnityCatalogConfigKey::WorkspaceUrl => "unity_workspace_url", } } @@ -268,6 +285,7 @@ pub struct UnityCatalogBuilder { client_options: super::client::ClientOptions, } +#[allow(deprecated)] impl UnityCatalogBuilder { /// Create a new [`UnityCatalogBuilder`] with default values. pub fn new() -> Self { @@ -281,19 +299,21 @@ impl UnityCatalogBuilder { value: impl Into, ) -> DataCatalogResult { match UnityCatalogConfigKey::from_str(key.as_ref())? { - UnityCatalogConfigKey::WorkspaceUrl => self.workspace_url = Some(value.into()), UnityCatalogConfigKey::AccessToken => self.bearer_token = Some(value.into()), UnityCatalogConfigKey::ClientId => self.client_id = Some(value.into()), UnityCatalogConfigKey::ClientSecret => self.client_secret = Some(value.into()), UnityCatalogConfigKey::AuthorityId => self.authority_id = Some(value.into()), UnityCatalogConfigKey::AuthorityHost => self.authority_host = Some(value.into()), + UnityCatalogConfigKey::Host => self.workspace_url = Some(value.into()), UnityCatalogConfigKey::MsiEndpoint => self.msi_endpoint = Some(value.into()), UnityCatalogConfigKey::ObjectId => self.object_id = Some(value.into()), UnityCatalogConfigKey::MsiResourceId => self.msi_resource_id = Some(value.into()), UnityCatalogConfigKey::FederatedTokenFile => { self.federated_token_file = Some(value.into()) } + UnityCatalogConfigKey::Token => self.bearer_token = Some(value.into()), UnityCatalogConfigKey::UseAzureCli => self.use_azure_cli = str_is_truthy(&value.into()), + UnityCatalogConfigKey::WorkspaceUrl => self.workspace_url = Some(value.into()), }; Ok(self) } From e6699d321750a5431bdd316dc1ade46e2ffae781 Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Mon, 2 Oct 2023 12:50:14 -0500 Subject: [PATCH 05/16] feat: support CREATE OR REPLACE --- rust/src/operations/create.rs | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/rust/src/operations/create.rs b/rust/src/operations/create.rs index 694d95a74e..7429007a6f 100644 --- a/rust/src/operations/create.rs +++ b/rust/src/operations/create.rs @@ -292,7 +292,7 @@ impl std::future::IntoFuture for CreateBuilder { Box::pin(async move { let mode = this.mode.clone(); let (mut table, actions, operation) = this.into_table_and_actions()?; - if table.object_store().is_delta_table_location().await? { + let table_state = if table.object_store().is_delta_table_location().await? { match mode { SaveMode::ErrorIfExists => return Err(CreateError::TableAlreadyExists.into()), SaveMode::Append => return Err(CreateError::AppendNotAllowed.into()), @@ -301,15 +301,19 @@ impl std::future::IntoFuture for CreateBuilder { return Ok(table); } SaveMode::Overwrite => { - todo!("Overwriting on create not yet implemented. Use 'write' operation instead.") + table.load().await?; + &table.state } } - } + } else { + &table.state + }; + let version = commit( table.object_store().as_ref(), &actions, operation, - &table.state, + table_state, None, ) .await?; @@ -456,12 +460,12 @@ mod tests { assert_eq!(table.get_metadata().unwrap().id, first_id); // Check table is overwritten - // let table = CreateBuilder::new() - // .with_object_store(object_store.clone()) - // .with_columns(schema.get_fields().clone()) - // .with_save_mode(SaveMode::Overwrite) - // .await - // .unwrap(); - // assert_ne!(table.get_metadata().unwrap().id, first_id) + let table = CreateBuilder::new() + .with_object_store(object_store.clone()) + .with_columns(schema.get_fields().clone()) + .with_save_mode(SaveMode::Overwrite) + .await + .unwrap(); + assert_ne!(table.get_metadata().unwrap().id, first_id) } } From dd1fa8c39c08965b7d703b13e1d68358130ec730 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 2 Oct 2023 21:35:34 -0700 Subject: [PATCH 06/16] docs: get docs.rs configured correctly again (#1693) # Description The docs build was changed in #1658 to compile on docs.rs with all features, but our crate cannot compile with all-features due to the TLS features, which are mutually exclusive. # Related Issue(s) For example: - closes #1692 This has been tested locally with the following command: ``` cargo doc --features azure,datafusion,datafusion,gcs,glue,json,python,s3,unity-experimental ``` --- rust/Cargo.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 2f31e8b915..66b9c7e5b9 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -13,7 +13,9 @@ readme = "README.md" edition = "2021" [package.metadata.docs.rs] -all-features = true +# We cannot use all_features because TLS features are mutually exclusive. +# We cannot use hdfs feature because it requires Java to be installed. +features = ["azure", "datafusion", "gcs", "glue", "hdfs", "json", "python", "s3", "unity-experimental"] [dependencies] # arrow From 4da7d66d06985d386e61d3fb124cad6680594dcc Mon Sep 17 00:00:00 2001 From: David Blajda Date: Tue, 3 Oct 2023 01:11:51 -0400 Subject: [PATCH 07/16] fix!: ensure predicates are parsable (#1690) # Description Resolves two issues that impact Datafusion implemented operators 1. When a user has an expression with a scalar built-in scalar function we are unable parse the output predicate since the `DummyContextProvider`'s methods are unimplemented. The provider now uses the user provided state or a default. More work is required in the future to allow a user provided Datafusion state to be used during the conflict checker. 2. The string representation was not parsable by sqlparser since it was not valid SQL. New code was written to transform an expression into a parsable sql string. Current implementation is not exhaustive however common use cases are covered. The delta_datafusion.rs file is getting large so I transformed it into a module. This implementation makes reuse of some code from Datafusion. I've added the Apache License at the top of the file. Let me know if any else is required to be compliant. # Related Issue(s) - closes #1625 --------- Co-authored-by: Will Jones --- rust/src/delta_datafusion/expr.rs | 505 ++++++++++++++++++ .../mod.rs} | 2 + rust/src/operations/delete.rs | 15 +- rust/src/operations/merge.rs | 32 +- rust/src/operations/mod.rs | 12 +- .../transaction/conflict_checker.rs | 5 +- rust/src/operations/transaction/state.rs | 47 +- rust/src/operations/update.rs | 20 +- 8 files changed, 598 insertions(+), 40 deletions(-) create mode 100644 rust/src/delta_datafusion/expr.rs rename rust/src/{delta_datafusion.rs => delta_datafusion/mod.rs} (99%) diff --git a/rust/src/delta_datafusion/expr.rs b/rust/src/delta_datafusion/expr.rs new file mode 100644 index 0000000000..d60fe6666c --- /dev/null +++ b/rust/src/delta_datafusion/expr.rs @@ -0,0 +1,505 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// This product includes software from the Datafusion project (Apache 2.0) +// https://github.com/apache/arrow-datafusion +// Display functions and required macros were pulled from https://github.com/apache/arrow-datafusion/blob/ddb95497e2792015d5a5998eec79aac8d37df1eb/datafusion/expr/src/expr.rs + +//! Utility functions for Datafusion's Expressions + +use std::fmt::{self, Display, Formatter, Write}; + +use datafusion_common::ScalarValue; +use datafusion_expr::{ + expr::{InList, ScalarUDF}, + Between, BinaryExpr, Expr, Like, +}; +use sqlparser::ast::escape_quoted_string; + +use crate::DeltaTableError; + +struct SqlFormat<'a> { + expr: &'a Expr, +} + +macro_rules! expr_vec_fmt { + ( $ARRAY:expr ) => {{ + $ARRAY + .iter() + .map(|e| format!("{}", SqlFormat { expr: e })) + .collect::>() + .join(", ") + }}; +} + +struct BinaryExprFormat<'a> { + expr: &'a BinaryExpr, +} + +impl<'a> Display for BinaryExprFormat<'a> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // Put parentheses around child binary expressions so that we can see the difference + // between `(a OR b) AND c` and `a OR (b AND c)`. We only insert parentheses when needed, + // based on operator precedence. For example, `(a AND b) OR c` and `a AND b OR c` are + // equivalent and the parentheses are not necessary. + + fn write_child(f: &mut Formatter<'_>, expr: &Expr, precedence: u8) -> fmt::Result { + match expr { + Expr::BinaryExpr(child) => { + let p = child.op.precedence(); + if p == 0 || p < precedence { + write!(f, "({})", BinaryExprFormat { expr: child })?; + } else { + write!(f, "{}", BinaryExprFormat { expr: child })?; + } + } + _ => write!(f, "{}", SqlFormat { expr })?, + } + Ok(()) + } + + let precedence = self.expr.op.precedence(); + write_child(f, self.expr.left.as_ref(), precedence)?; + write!(f, " {} ", self.expr.op)?; + write_child(f, self.expr.right.as_ref(), precedence) + } +} + +impl<'a> Display for SqlFormat<'a> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.expr { + Expr::Column(c) => write!(f, "{c}"), + Expr::Literal(v) => write!(f, "{}", ScalarValueFormat { scalar: v }), + Expr::Case(case) => { + write!(f, "CASE ")?; + if let Some(e) = &case.expr { + write!(f, "{} ", SqlFormat { expr: e })?; + } + for (w, t) in &case.when_then_expr { + write!( + f, + "WHEN {} THEN {} ", + SqlFormat { expr: w }, + SqlFormat { expr: t } + )?; + } + if let Some(e) = &case.else_expr { + write!(f, "ELSE {} ", SqlFormat { expr: e })?; + } + write!(f, "END") + } + Expr::Not(expr) => write!(f, "NOT {}", SqlFormat { expr }), + Expr::Negative(expr) => write!(f, "(- {})", SqlFormat { expr }), + Expr::IsNull(expr) => write!(f, "{} IS NULL", SqlFormat { expr }), + Expr::IsNotNull(expr) => write!(f, "{} IS NOT NULL", SqlFormat { expr }), + Expr::IsTrue(expr) => write!(f, "{} IS TRUE", SqlFormat { expr }), + Expr::IsFalse(expr) => write!(f, "{} IS FALSE", SqlFormat { expr }), + Expr::IsUnknown(expr) => write!(f, "{} IS UNKNOWN", SqlFormat { expr }), + Expr::IsNotTrue(expr) => write!(f, "{} IS NOT TRUE", SqlFormat { expr }), + Expr::IsNotFalse(expr) => write!(f, "{} IS NOT FALSE", SqlFormat { expr }), + Expr::IsNotUnknown(expr) => write!(f, "{} IS NOT UNKNOWN", SqlFormat { expr }), + Expr::BinaryExpr(expr) => write!(f, "{}", BinaryExprFormat { expr }), + Expr::ScalarFunction(func) => fmt_function(f, &func.fun.to_string(), false, &func.args), + Expr::ScalarUDF(ScalarUDF { fun, args }) => fmt_function(f, &fun.name, false, args), + Expr::Between(Between { + expr, + negated, + low, + high, + }) => { + if *negated { + write!( + f, + "{} NOT BETWEEN {} AND {}", + SqlFormat { expr }, + SqlFormat { expr: low }, + SqlFormat { expr: high } + ) + } else { + write!( + f, + "{} BETWEEN {} AND {}", + SqlFormat { expr }, + SqlFormat { expr: low }, + SqlFormat { expr: high } + ) + } + } + Expr::Like(Like { + negated, + expr, + pattern, + escape_char, + case_insensitive, + }) => { + write!(f, "{}", SqlFormat { expr })?; + let op_name = if *case_insensitive { "ILIKE" } else { "LIKE" }; + if *negated { + write!(f, " NOT")?; + } + if let Some(char) = escape_char { + write!( + f, + " {op_name} {} ESCAPE '{char}'", + SqlFormat { expr: pattern } + ) + } else { + write!(f, " {op_name} {}", SqlFormat { expr: pattern }) + } + } + Expr::SimilarTo(Like { + negated, + expr, + pattern, + escape_char, + case_insensitive: _, + }) => { + write!(f, "{expr}")?; + if *negated { + write!(f, " NOT")?; + } + if let Some(char) = escape_char { + write!(f, " SIMILAR TO {pattern} ESCAPE '{char}'") + } else { + write!(f, " SIMILAR TO {pattern}") + } + } + Expr::InList(InList { + expr, + list, + negated, + }) => { + if *negated { + write!(f, "{expr} NOT IN ({})", expr_vec_fmt!(list)) + } else { + write!(f, "{expr} IN ({})", expr_vec_fmt!(list)) + } + } + _ => Err(fmt::Error), + } + } +} + +/// Format an `Expr` to a parsable SQL expression +pub fn fmt_expr_to_sql(expr: &Expr) -> Result { + let mut s = String::new(); + write!(&mut s, "{}", SqlFormat { expr }).map_err(|_| { + DeltaTableError::Generic("Unable to convert expression to string".to_owned()) + })?; + Ok(s) +} + +fn fmt_function(f: &mut fmt::Formatter, fun: &str, distinct: bool, args: &[Expr]) -> fmt::Result { + let args: Vec = args + .iter() + .map(|arg| format!("{}", SqlFormat { expr: arg })) + .collect(); + + let distinct_str = match distinct { + true => "DISTINCT ", + false => "", + }; + write!(f, "{}({}{})", fun, distinct_str, args.join(", ")) +} + +macro_rules! format_option { + ($F:expr, $EXPR:expr) => {{ + match $EXPR { + Some(e) => write!($F, "{e}"), + None => write!($F, "NULL"), + } + }}; +} + +struct ScalarValueFormat<'a> { + scalar: &'a ScalarValue, +} + +impl<'a> fmt::Display for ScalarValueFormat<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.scalar { + ScalarValue::Boolean(e) => format_option!(f, e)?, + ScalarValue::Float32(e) => format_option!(f, e)?, + ScalarValue::Float64(e) => format_option!(f, e)?, + ScalarValue::Int8(e) => format_option!(f, e)?, + ScalarValue::Int16(e) => format_option!(f, e)?, + ScalarValue::Int32(e) => format_option!(f, e)?, + ScalarValue::Int64(e) => format_option!(f, e)?, + ScalarValue::UInt8(e) => format_option!(f, e)?, + ScalarValue::UInt16(e) => format_option!(f, e)?, + ScalarValue::UInt32(e) => format_option!(f, e)?, + ScalarValue::UInt64(e) => format_option!(f, e)?, + ScalarValue::Utf8(e) | ScalarValue::LargeUtf8(e) => match e { + Some(e) => write!(f, "'{}'", escape_quoted_string(e, '\''))?, + None => write!(f, "NULL")?, + }, + ScalarValue::Binary(e) + | ScalarValue::FixedSizeBinary(_, e) + | ScalarValue::LargeBinary(e) => match e { + Some(l) => write!( + f, + "decode('{}', 'hex')", + l.iter() + .map(|v| format!("{v:02x}")) + .collect::>() + .join("") + )?, + None => write!(f, "NULL")?, + }, + ScalarValue::Null => write!(f, "NULL")?, + _ => return Err(fmt::Error), + }; + Ok(()) + } +} + +#[cfg(test)] +mod test { + use std::collections::HashMap; + + use datafusion::prelude::SessionContext; + use datafusion_common::{DFSchema, ScalarValue}; + use datafusion_expr::{col, decode, lit, substring, Expr, ExprSchemable}; + + use crate::{DeltaOps, DeltaTable, Schema, SchemaDataType, SchemaField}; + + use super::fmt_expr_to_sql; + + struct ParseTest { + expr: Expr, + expected: String, + override_expected_expr: Option, + } + + macro_rules! simple { + ( $EXPR:expr, $STR:expr ) => {{ + ParseTest { + expr: $EXPR, + expected: $STR, + override_expected_expr: None, + } + }}; + } + + async fn setup_table() -> DeltaTable { + let schema = Schema::new(vec![ + SchemaField::new( + "id".to_string(), + SchemaDataType::primitive("string".to_string()), + true, + HashMap::new(), + ), + SchemaField::new( + "value".to_string(), + SchemaDataType::primitive("integer".to_string()), + true, + HashMap::new(), + ), + SchemaField::new( + "value2".to_string(), + SchemaDataType::primitive("integer".to_string()), + true, + HashMap::new(), + ), + SchemaField::new( + "modified".to_string(), + SchemaDataType::primitive("string".to_string()), + true, + HashMap::new(), + ), + SchemaField::new( + "active".to_string(), + SchemaDataType::primitive("boolean".to_string()), + true, + HashMap::new(), + ), + SchemaField::new( + "money".to_string(), + SchemaDataType::primitive("decimal(12,2)".to_string()), + true, + HashMap::new(), + ), + SchemaField::new( + "_date".to_string(), + SchemaDataType::primitive("date".to_string()), + true, + HashMap::new(), + ), + SchemaField::new( + "_timestamp".to_string(), + SchemaDataType::primitive("timestamp".to_string()), + true, + HashMap::new(), + ), + SchemaField::new( + "_binary".to_string(), + SchemaDataType::primitive("binary".to_string()), + true, + HashMap::new(), + ), + ]); + + let table = DeltaOps::new_in_memory() + .create() + .with_columns(schema.get_fields().clone()) + .await + .unwrap(); + assert_eq!(table.version(), 0); + table + } + + #[tokio::test] + async fn test_expr_sql() { + let table = setup_table().await; + + // String expression that we output must be parsable for conflict resolution. + let tests = vec![ + simple!(col("value").eq(lit(3_i64)), "value = 3".to_string()), + simple!(col("active").is_true(), "active IS TRUE".to_string()), + simple!(col("active"), "active".to_string()), + simple!(col("active").eq(lit(true)), "active = true".to_string()), + simple!(col("active").is_null(), "active IS NULL".to_string()), + simple!( + col("modified").eq(lit("2021-02-03")), + "modified = '2021-02-03'".to_string() + ), + simple!( + col("modified").eq(lit("'validate ' escapi\\ng'")), + "modified = '''validate '' escapi\\ng'''".to_string() + ), + simple!(col("money").gt(lit(0.10)), "money > 0.1".to_string()), + ParseTest { + expr: col("_binary").eq(lit(ScalarValue::Binary(Some(vec![0xAA, 0x00, 0xFF])))), + expected: "_binary = decode('aa00ff', 'hex')".to_string(), + override_expected_expr: Some(col("_binary").eq(decode(lit("aa00ff"), lit("hex")))), + }, + simple!( + col("value").between(lit(20_i64), lit(30_i64)), + "value BETWEEN 20 AND 30".to_string() + ), + simple!( + col("value").not_between(lit(20_i64), lit(30_i64)), + "value NOT BETWEEN 20 AND 30".to_string() + ), + simple!( + col("modified").like(lit("abc%")), + "modified LIKE 'abc%'".to_string() + ), + simple!( + col("modified").not_like(lit("abc%")), + "modified NOT LIKE 'abc%'".to_string() + ), + simple!( + (((col("value") * lit(2_i64) + col("value2")) / lit(3_i64)) - col("value")) + .gt(lit(0_i64)), + "(value * 2 + value2) / 3 - value > 0".to_string() + ), + simple!( + col("modified").in_list(vec![lit("a"), lit("c")], false), + "modified IN ('a', 'c')".to_string() + ), + simple!( + col("modified").in_list(vec![lit("a"), lit("c")], true), + "modified NOT IN ('a', 'c')".to_string() + ), + // Validate order of operations is maintained + simple!( + col("modified") + .eq(lit("value")) + .and(col("value").eq(lit(1_i64))) + .or(col("modified") + .eq(lit("value2")) + .and(col("value").gt(lit(1_i64)))), + "modified = 'value' AND value = 1 OR modified = 'value2' AND value > 1".to_string() + ), + simple!( + col("modified") + .eq(lit("value")) + .or(col("value").eq(lit(1_i64))) + .and( + col("modified") + .eq(lit("value2")) + .or(col("value").gt(lit(1_i64))), + ), + "(modified = 'value' OR value = 1) AND (modified = 'value2' OR value > 1)" + .to_string() + ), + // Validate functions are correctly parsed + simple!( + substring(col("modified"), lit(0_i64), lit(4_i64)).eq(lit("2021")), + "substr(modified, 0, 4) = '2021'".to_string() + ), + ]; + + let session = SessionContext::new(); + + for test in tests { + let actual = fmt_expr_to_sql(&test.expr).unwrap(); + assert_eq!(test.expected, actual); + + let actual_expr = table + .state + .parse_predicate_expression(actual, &session.state()) + .unwrap(); + + match test.override_expected_expr { + None => assert_eq!(test.expr, actual_expr), + Some(expr) => assert_eq!(expr, actual_expr), + } + } + + let unsupported_types = vec![ + /* TODO: Determine proper way to display decimal values in an sql expression*/ + simple!( + col("money").gt(lit(ScalarValue::Decimal128(Some(100), 12, 2))), + "money > 0.1".to_string() + ), + simple!( + col("_timestamp").gt(lit(ScalarValue::TimestampMillisecond(Some(100), None))), + "".to_string() + ), + simple!( + col("_timestamp").gt(lit(ScalarValue::TimestampMillisecond( + Some(100), + Some("UTC".into()) + ))), + "".to_string() + ), + simple!( + col("value") + .cast_to::( + &arrow_schema::DataType::Utf8, + &table + .state + .input_schema() + .unwrap() + .as_ref() + .to_owned() + .try_into() + .unwrap() + ) + .unwrap() + .eq(lit("1")), + "CAST(value as STRING) = '1'".to_string() + ), + ]; + + for test in unsupported_types { + assert!(fmt_expr_to_sql(&test.expr).is_err()); + } + } +} diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion/mod.rs similarity index 99% rename from rust/src/delta_datafusion.rs rename to rust/src/delta_datafusion/mod.rs index e542413cfd..166996dddd 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion/mod.rs @@ -76,6 +76,8 @@ use crate::{open_table, open_table_with_storage_options, DeltaTable, Invariant, const PATH_COLUMN: &str = "__delta_rs_path"; +pub mod expr; + impl From for DataFusionError { fn from(err: DeltaTableError) -> Self { match err { diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index d7f908680d..f07c92e442 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use std::time::{Instant, SystemTime, UNIX_EPOCH}; +use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::protocol::{Action, Add, Remove}; use datafusion::execution::context::{SessionContext, SessionState}; use datafusion::physical_expr::create_physical_expr; @@ -263,7 +264,7 @@ async fn execute( // Do not make a commit when there are zero updates to the state if !actions.is_empty() { let operation = DeltaOperation::Delete { - predicate: Some(predicate.canonical_name()), + predicate: Some(fmt_expr_to_sql(&predicate)?), }; version = commit( object_store.as_ref(), @@ -298,7 +299,9 @@ impl std::future::IntoFuture for DeleteBuilder { let predicate = match this.predicate { Some(predicate) => match predicate { Expression::DataFusion(expr) => Some(expr), - Expression::String(s) => Some(this.snapshot.parse_predicate_expression(s)?), + Expression::String(s) => { + Some(this.snapshot.parse_predicate_expression(s, &state)?) + } }, None => None, }; @@ -335,6 +338,7 @@ mod tests { use arrow::record_batch::RecordBatch; use datafusion::assert_batches_sorted_eq; use datafusion::prelude::*; + use serde_json::json; use std::sync::Arc; async fn setup_table(partitions: Option>) -> DeltaTable { @@ -456,7 +460,7 @@ mod tests { assert_eq!(table.version(), 2); assert_eq!(table.get_file_uris().count(), 2); - let (table, metrics) = DeltaOps(table) + let (mut table, metrics) = DeltaOps(table) .delete() .with_predicate(col("value").eq(lit(1))) .await @@ -470,6 +474,11 @@ mod tests { assert_eq!(metrics.num_deleted_rows, Some(1)); assert_eq!(metrics.num_copied_rows, Some(3)); + let commit_info = table.history(None).await.unwrap(); + let last_commit = &commit_info[commit_info.len() - 1]; + let parameters = last_commit.operation_parameters.clone().unwrap(); + assert_eq!(parameters["predicate"], json!("value = 1")); + let expected = vec![ "+----+-------+------------+", "| id | value | modified |", diff --git a/rust/src/operations/merge.rs b/rust/src/operations/merge.rs index d088fbd3b7..d52dd26819 100644 --- a/rust/src/operations/merge.rs +++ b/rust/src/operations/merge.rs @@ -62,6 +62,7 @@ use serde_json::{Map, Value}; use super::datafusion_utils::{into_expr, maybe_into_expr, Expression}; use super::transaction::commit; +use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{parquet_scan_from_actions, register_store}; use crate::operations::datafusion_utils::MetricObserverExec; use crate::operations::write::write_execution_plan; @@ -171,6 +172,7 @@ impl MergeBuilder { let builder = builder(UpdateBuilder::default()); let op = MergeOperation::try_new( &self.snapshot, + &self.state.as_ref(), builder.predicate, builder.updates, OperationType::Update, @@ -204,6 +206,7 @@ impl MergeBuilder { let builder = builder(DeleteBuilder::default()); let op = MergeOperation::try_new( &self.snapshot, + &self.state.as_ref(), builder.predicate, HashMap::default(), OperationType::Delete, @@ -240,6 +243,7 @@ impl MergeBuilder { let builder = builder(InsertBuilder::default()); let op = MergeOperation::try_new( &self.snapshot, + &self.state.as_ref(), builder.predicate, builder.set, OperationType::Insert, @@ -278,6 +282,7 @@ impl MergeBuilder { let builder = builder(UpdateBuilder::default()); let op = MergeOperation::try_new( &self.snapshot, + &self.state.as_ref(), builder.predicate, builder.updates, OperationType::Update, @@ -311,6 +316,7 @@ impl MergeBuilder { let builder = builder(DeleteBuilder::default()); let op = MergeOperation::try_new( &self.snapshot, + &self.state.as_ref(), builder.predicate, HashMap::default(), OperationType::Delete, @@ -448,15 +454,21 @@ struct MergeOperation { impl MergeOperation { pub fn try_new( snapshot: &DeltaTableState, + state: &Option<&SessionState>, predicate: Option, operations: HashMap, r#type: OperationType, ) -> DeltaResult { - let predicate = maybe_into_expr(predicate, snapshot)?; + let context = SessionContext::new(); + let mut s = &context.state(); + if let Some(df_state) = state { + s = df_state; + } + let predicate = maybe_into_expr(predicate, snapshot, s)?; let mut _operations = HashMap::new(); for (column, expr) in operations { - _operations.insert(column, into_expr(expr, snapshot)?); + _operations.insert(column, into_expr(expr, snapshot, s)?); } Ok(MergeOperation { @@ -518,7 +530,7 @@ async fn execute( let predicate = match predicate { Expression::DataFusion(expr) => expr, - Expression::String(s) => snapshot.parse_predicate_expression(s)?, + Expression::String(s) => snapshot.parse_predicate_expression(s, &state)?, }; let schema = snapshot.input_schema()?; @@ -675,7 +687,10 @@ async fn execute( }; let action_type = action_type.to_string(); - let predicate = op.predicate.map(|expr| expr.display_name().unwrap()); + let predicate = op + .predicate + .map(|expr| fmt_expr_to_sql(&expr)) + .transpose()?; predicates.push(MergePredicate { action_type, @@ -1035,7 +1050,7 @@ async fn execute( // Do not make a commit when there are zero updates to the state if !actions.is_empty() { let operation = DeltaOperation::Merge { - predicate: Some(predicate.canonical_name()), + predicate: Some(fmt_expr_to_sql(&predicate)?), matched_predicates: match_operations, not_matched_predicates: not_match_target_operations, not_matched_by_source_predicates: not_match_source_operations, @@ -1222,10 +1237,9 @@ mod tests { parameters["notMatchedPredicates"], json!(r#"[{"actionType":"insert"}]"#) ); - // Todo: Expected this predicate to actually be 'value = 1'. Predicate should contain a valid sql expression assert_eq!( parameters["notMatchedBySourcePredicates"], - json!(r#"[{"actionType":"update","predicate":"value = Int32(1)"}]"#) + json!(r#"[{"actionType":"update","predicate":"value = 1"}]"#) ); let expected = vec![ @@ -1447,7 +1461,7 @@ mod tests { assert_eq!(parameters["predicate"], json!("id = source.id")); assert_eq!( parameters["matchedPredicates"], - json!(r#"[{"actionType":"delete","predicate":"source.value <= Int32(10)"}]"#) + json!(r#"[{"actionType":"delete","predicate":"source.value <= 10"}]"#) ); let expected = vec![ @@ -1579,7 +1593,7 @@ mod tests { assert_eq!(parameters["predicate"], json!("id = source.id")); assert_eq!( parameters["notMatchedBySourcePredicates"], - json!(r#"[{"actionType":"delete","predicate":"modified > Utf8(\"2021-02-01\")"}]"#) + json!(r#"[{"actionType":"delete","predicate":"modified > '2021-02-01'"}]"#) ); let expected = vec![ diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index 7b6cb27ace..c07b81438b 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -205,6 +205,7 @@ mod datafusion_utils { use arrow_schema::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::Result as DataFusionResult; + use datafusion::execution::context::SessionState; use datafusion::physical_plan::DisplayAs; use datafusion::physical_plan::{ metrics::{ExecutionPlanMetricsSet, MetricsSet}, @@ -240,19 +241,24 @@ mod datafusion_utils { } } - pub(crate) fn into_expr(expr: Expression, snapshot: &DeltaTableState) -> DeltaResult { + pub(crate) fn into_expr( + expr: Expression, + snapshot: &DeltaTableState, + df_state: &SessionState, + ) -> DeltaResult { match expr { Expression::DataFusion(expr) => Ok(expr), - Expression::String(s) => snapshot.parse_predicate_expression(s), + Expression::String(s) => snapshot.parse_predicate_expression(s, df_state), } } pub(crate) fn maybe_into_expr( expr: Option, snapshot: &DeltaTableState, + df_state: &SessionState, ) -> DeltaResult> { Ok(match expr { - Some(predicate) => Some(into_expr(predicate, snapshot)?), + Some(predicate) => Some(into_expr(predicate, snapshot, df_state)?), None => None, }) } diff --git a/rust/src/operations/transaction/conflict_checker.rs b/rust/src/operations/transaction/conflict_checker.rs index d75e401def..d7a9d3fb86 100644 --- a/rust/src/operations/transaction/conflict_checker.rs +++ b/rust/src/operations/transaction/conflict_checker.rs @@ -114,8 +114,11 @@ impl<'a> TransactionInfo<'a> { actions: &'a Vec, read_whole_table: bool, ) -> DeltaResult { + use datafusion::prelude::SessionContext; + + let session = SessionContext::new(); let read_predicates = read_predicates - .map(|pred| read_snapshot.parse_predicate_expression(pred)) + .map(|pred| read_snapshot.parse_predicate_expression(pred, &session.state())) .transpose()?; Ok(Self { txn_id: "".into(), diff --git a/rust/src/operations/transaction/state.rs b/rust/src/operations/transaction/state.rs index 6fe1d65aee..5924609fb7 100644 --- a/rust/src/operations/transaction/state.rs +++ b/rust/src/operations/transaction/state.rs @@ -5,6 +5,7 @@ use arrow::datatypes::{ DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; use datafusion::datasource::physical_plan::wrap_partition_type_in_dict; +use datafusion::execution::context::SessionState; use datafusion::optimizer::utils::conjunction; use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use datafusion_common::config::ConfigOptions; @@ -104,7 +105,11 @@ impl DeltaTableState { } /// Parse an expression string into a datafusion [`Expr`] - pub fn parse_predicate_expression(&self, expr: impl AsRef) -> DeltaResult { + pub fn parse_predicate_expression( + &self, + expr: impl AsRef, + df_state: &SessionState, + ) -> DeltaResult { let dialect = &GenericDialect {}; let mut tokenizer = Tokenizer::new(dialect, expr.as_ref()); let tokens = tokenizer @@ -121,7 +126,7 @@ impl DeltaTableState { // TODO should we add the table name as qualifier when available? let df_schema = DFSchema::try_from_qualified_schema("", self.arrow_schema()?.as_ref())?; - let context_provider = DummyContextProvider::default(); + let context_provider = DeltaContextProvider { state: df_state }; let sql_to_rel = SqlToRel::new(&context_provider); Ok(sql_to_rel.sql_to_expr(sql, &df_schema, &mut Default::default())?) @@ -342,34 +347,33 @@ impl PruningStatistics for DeltaTableState { } } -#[derive(Default)] -struct DummyContextProvider { - options: ConfigOptions, +pub(crate) struct DeltaContextProvider<'a> { + state: &'a SessionState, } -impl ContextProvider for DummyContextProvider { +impl<'a> ContextProvider for DeltaContextProvider<'a> { fn get_table_provider(&self, _name: TableReference) -> DFResult> { unimplemented!() } - fn get_function_meta(&self, _name: &str) -> Option> { - unimplemented!() + fn get_function_meta(&self, name: &str) -> Option> { + self.state.scalar_functions().get(name).cloned() } - fn get_aggregate_meta(&self, _name: &str) -> Option> { - unimplemented!() + fn get_aggregate_meta(&self, name: &str) -> Option> { + self.state.aggregate_functions().get(name).cloned() } - fn get_variable_type(&self, _: &[String]) -> Option { + fn get_variable_type(&self, _var: &[String]) -> Option { unimplemented!() } fn options(&self) -> &ConfigOptions { - &self.options + self.state.config_options() } - fn get_window_meta(&self, _name: &str) -> Option> { - unimplemented!() + fn get_window_meta(&self, name: &str) -> Option> { + self.state.window_functions().get(name).cloned() } } @@ -377,24 +381,29 @@ impl ContextProvider for DummyContextProvider { mod tests { use super::*; use crate::operations::transaction::test_utils::{create_add_action, init_table_actions}; + use datafusion::prelude::SessionContext; use datafusion_expr::{col, lit}; #[test] fn test_parse_predicate_expression() { - let state = DeltaTableState::from_actions(init_table_actions(), 0).unwrap(); + let snapshot = DeltaTableState::from_actions(init_table_actions(), 0).unwrap(); + let session = SessionContext::new(); + let state = session.state(); // parses simple expression - let parsed = state.parse_predicate_expression("value > 10").unwrap(); + let parsed = snapshot + .parse_predicate_expression("value > 10", &state) + .unwrap(); let expected = col("value").gt(lit::(10)); assert_eq!(parsed, expected); // fails for unknown column - let parsed = state.parse_predicate_expression("non_existent > 10"); + let parsed = snapshot.parse_predicate_expression("non_existent > 10", &state); assert!(parsed.is_err()); // parses complex expression - let parsed = state - .parse_predicate_expression("value > 10 OR value <= 0") + let parsed = snapshot + .parse_predicate_expression("value > 10 OR value <= 0", &state) .unwrap(); let expected = col("value") .gt(lit::(10)) diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index b030bc5644..3891c04fd9 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -43,7 +43,9 @@ use parquet::file::properties::WriterProperties; use serde_json::{Map, Value}; use crate::{ - delta_datafusion::{find_files, parquet_scan_from_actions, register_store}, + delta_datafusion::{ + expr::fmt_expr_to_sql, find_files, parquet_scan_from_actions, register_store, + }, protocol::{Action, DeltaOperation, Remove}, storage::{DeltaObjectStore, ObjectStoreRef}, table::state::DeltaTableState, @@ -194,7 +196,7 @@ async fn execute( let predicate = match predicate { Some(predicate) => match predicate { Expression::DataFusion(expr) => Some(expr), - Expression::String(s) => Some(snapshot.parse_predicate_expression(s)?), + Expression::String(s) => Some(snapshot.parse_predicate_expression(s, &state)?), }, None => None, }; @@ -203,7 +205,9 @@ async fn execute( .into_iter() .map(|(key, expr)| match expr { Expression::DataFusion(e) => Ok((key, e)), - Expression::String(s) => snapshot.parse_predicate_expression(s).map(|e| (key, e)), + Expression::String(s) => snapshot + .parse_predicate_expression(s, &state) + .map(|e| (key, e)), }) .collect::, _>>()?; @@ -416,7 +420,7 @@ async fn execute( metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_millis() as u64; let operation = DeltaOperation::Update { - predicate: Some(predicate.canonical_name()), + predicate: Some(fmt_expr_to_sql(&predicate)?), }; version = commit( object_store.as_ref(), @@ -481,6 +485,7 @@ mod tests { use arrow_array::Int32Array; use datafusion::assert_batches_sorted_eq; use datafusion::prelude::*; + use serde_json::json; use std::sync::Arc; async fn setup_table(partitions: Option>) -> DeltaTable { @@ -603,7 +608,7 @@ mod tests { assert_eq!(table.version(), 1); assert_eq!(table.get_file_uris().count(), 1); - let (table, metrics) = DeltaOps(table) + let (mut table, metrics) = DeltaOps(table) .update() .with_predicate(col("modified").eq(lit("2021-02-03"))) .with_update("modified", lit("2023-05-14")) @@ -617,6 +622,11 @@ mod tests { assert_eq!(metrics.num_updated_rows, 2); assert_eq!(metrics.num_copied_rows, 2); + let commit_info = table.history(None).await.unwrap(); + let last_commit = &commit_info[commit_info.len() - 1]; + let parameters = last_commit.operation_parameters.clone().unwrap(); + assert_eq!(parameters["predicate"], json!("modified = '2021-02-03'")); + let expected = vec![ "+----+-------+------------+", "| id | value | modified |", From e326473d3e14d01dced8808bfe8d4fad59c07f79 Mon Sep 17 00:00:00 2001 From: Josiah Parry Date: Tue, 3 Oct 2023 14:23:54 -0400 Subject: [PATCH 08/16] fix typo in readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index feb61ec8f8..0a4e3b22ee 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ API that lets you query, inspect, and operate your Delta Lake with ease. - [Quick Start](#quick-start) - [Get Involved](#get-involved) -- [Integartions](#integrations) +- [Integrations](#integrations) - [Features](#features) ## Quick Start From 03dd2abb53ec4301ec33dbd8cb0507b3130e9ccf Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Tue, 3 Oct 2023 16:31:29 -0700 Subject: [PATCH 09/16] fix: address formatting errors --- python/deltalake/_internal.pyi | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 5ba3b2b077..747a7f3dde 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -13,11 +13,9 @@ from deltalake.writer import AddAction __version__: str -class RawDeltaTable: - ... +class RawDeltaTable: ... def rust_core_version() -> str: ... - def write_new_deltalake( table_uri: str, schema: pa.Schema, @@ -29,7 +27,6 @@ def write_new_deltalake( configuration: Optional[Mapping[str, Optional[str]]], storage_options: Optional[Dict[str, str]], ): ... - def batch_distinct(batch: pa.RecordBatch) -> pa.RecordBatch: ... # Can't implement inheritance (see note in src/schema.rs), so this is next @@ -99,12 +96,9 @@ class Field: metadata: Dict[str, Any] def to_json(self) -> str: ... - @staticmethod def from_json(json: str) -> "Field": ... - def to_pyarrow(self) -> pa.Field: ... - @staticmethod def from_pyarrow(type: pa.Field) -> "Field": ... @@ -126,11 +120,9 @@ class Schema: invariants: List[Tuple[str, str]] def to_json(self) -> str: ... - @staticmethod def from_json(json: str) -> "Schema": ... def to_pyarrow(self, as_large_types: bool = False) -> pa.Schema: ... - @staticmethod def from_pyarrow(type: pa.Schema) -> "Schema": ... From 1ec7cf1e842b7f855d0324eeedadf228df046060 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Tue, 3 Oct 2023 16:43:26 -0700 Subject: [PATCH 10/16] fix: remove an unused import --- python/deltalake/_internal.pyi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 747a7f3dde..f476edd2f1 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -1,5 +1,5 @@ import sys -from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union +from typing import Any, Dict, List, Mapping, Optional, Tuple, Union if sys.version_info >= (3, 8): from typing import Literal From 3ba3426f2cf717593359c5c6d8fa5bf74c1bcd58 Mon Sep 17 00:00:00 2001 From: Guilhem de Viry Date: Thu, 5 Oct 2023 04:48:48 +0200 Subject: [PATCH 11/16] feat(python): expose delete operation (#1687) # Description Naively expose the delete operation, with the option to provide a predicate. I first tried to expose a richer API with the Python `FilterType` and DNF expressions, but from what I understand delta-rs doesn't implement generic filters but only `PartitionFilter`. The `DeleteBuilder` also only accepts datafusion expressions. So Instead of hacking my way around or proposing a refactor I went for the simpler approach of sending a string predicate to the rust lib. If this implementation is OK I will add tests. # Related Issue(s) - closes #1417 --------- Co-authored-by: Will Jones --- python/deltalake/table.py | 26 ++++++++++++---- python/src/lib.rs | 15 +++++++++ python/tests/test_delete.py | 58 +++++++++++++++++++++++++++++++++++ rust/src/operations/delete.rs | 5 +-- 4 files changed, 96 insertions(+), 8 deletions(-) create mode 100644 python/tests/test_delete.py diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 7f24145dca..72912d696d 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -199,9 +199,9 @@ def _filters_to_expression(filters: FilterType) -> Expression: Predicates are expressed in disjunctive normal form (DNF), like [("x", "=", "a"), ...]. DNF allows arbitrary boolean logical combinations of single partition predicates. The innermost tuples each describe a single partition predicate. The list of inner -predicates is interpreted as a conjunction (AND), forming a more selective and -multiple partition predicates. Each tuple has format: (key, op, value) and compares -the key with the value. The supported op are: `=`, `!=`, `in`, and `not in`. If +predicates is interpreted as a conjunction (AND), forming a more selective and +multiple partition predicates. Each tuple has format: (key, op, value) and compares +the key with the value. The supported op are: `=`, `!=`, `in`, and `not in`. If the op is in or not in, the value must be a collection such as a list, a set or a tuple. The supported type for value is str. Use empty string `''` for Null partition value. @@ -302,13 +302,13 @@ def files( files.__doc__ = f""" Get the .parquet files of the DeltaTable. - + The paths are as they are saved in the delta log, which may either be relative to the table root or absolute URIs. -:param partition_filters: the partition filters that will be used for +:param partition_filters: the partition filters that will be used for getting the matched files -:return: list of the .parquet files referenced for the current version +:return: list of the .parquet files referenced for the current version of the DeltaTable {_DNF_filter_doc} """ @@ -666,6 +666,20 @@ def get_add_actions(self, flatten: bool = False) -> pyarrow.RecordBatch: """ return self._table.get_add_actions(flatten) + def delete(self, predicate: Optional[str] = None) -> Dict[str, Any]: + """Delete records from a Delta Table that statisfy a predicate. + + When a predicate is not provided then all records are deleted from the Delta + Table. Otherwise a scan of the Delta table is performed to mark any files + that contain records that satisfy the predicate. Once files are determined + they are rewritten without the records. + + :param predicate: a SQL where clause. If not passed, will delete all rows. + :return: the metrics from delete. + """ + metrics = self._table.delete(predicate) + return json.loads(metrics) + class TableOptimizer: """API for various table optimization commands.""" diff --git a/python/src/lib.rs b/python/src/lib.rs index 8115c1bb76..ebdc395497 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -21,6 +21,7 @@ use deltalake::checkpoints::create_checkpoint; use deltalake::datafusion::prelude::SessionContext; use deltalake::delta_datafusion::DeltaDataChecker; use deltalake::errors::DeltaTableError; +use deltalake::operations::delete::DeleteBuilder; use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType}; use deltalake::operations::restore::RestoreBuilder; use deltalake::operations::transaction::commit; @@ -594,6 +595,20 @@ impl RawDeltaTable { .map_err(PythonError::from)?, )) } + + /// Run the delete command on the delta table: delete records following a predicate and return the delete metrics. + #[pyo3(signature = (predicate = None))] + pub fn delete(&mut self, predicate: Option) -> PyResult { + let mut cmd = DeleteBuilder::new(self._table.object_store(), self._table.state.clone()); + if let Some(predicate) = predicate { + cmd = cmd.with_predicate(predicate); + } + let (table, metrics) = rt()? + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + self._table.state = table.state; + Ok(serde_json::to_string(&metrics).unwrap()) + } } fn convert_partition_filters<'a>( diff --git a/python/tests/test_delete.py b/python/tests/test_delete.py new file mode 100644 index 0000000000..f90125b2fb --- /dev/null +++ b/python/tests/test_delete.py @@ -0,0 +1,58 @@ +import pathlib + +import pyarrow as pa +import pyarrow.compute as pc + +from deltalake.table import DeltaTable +from deltalake.writer import write_deltalake + + +def test_delete_no_predicates(existing_table: DeltaTable): + old_version = existing_table.version() + + existing_table.delete() + + last_action = existing_table.history(1)[0] + assert last_action["operation"] == "DELETE" + assert existing_table.version() == old_version + 1 + + dataset = existing_table.to_pyarrow_dataset() + assert dataset.count_rows() == 0 + assert len(existing_table.files()) == 0 + + +def test_delete_a_partition(tmp_path: pathlib.Path, sample_data: pa.Table): + write_deltalake(tmp_path, sample_data, partition_by=["bool"]) + + dt = DeltaTable(tmp_path) + old_version = dt.version() + + mask = pc.equal(sample_data["bool"], False) + expected_table = sample_data.filter(mask) + + dt.delete(predicate="bool = true") + + last_action = dt.history(1)[0] + assert last_action["operation"] == "DELETE" + assert dt.version() == old_version + 1 + + table = dt.to_pyarrow_table() + assert table.equals(expected_table) + assert len(dt.files()) == 1 + + +def test_delete_some_rows(existing_table: DeltaTable): + old_version = existing_table.version() + + existing = existing_table.to_pyarrow_table() + mask = pc.invert(pc.is_in(existing["utf8"], pa.array(["0", "1"]))) + expected_table = existing.filter(mask) + + existing_table.delete(predicate="utf8 in ('0', '1')") + + last_action = existing_table.history(1)[0] + assert last_action["operation"] == "DELETE" + assert existing_table.version() == old_version + 1 + + table = existing_table.to_pyarrow_table() + assert table.equals(expected_table) diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index f07c92e442..550e97e6ba 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -31,6 +31,7 @@ use datafusion_common::scalar::ScalarValue; use datafusion_common::DFSchema; use futures::future::BoxFuture; use parquet::file::properties::WriterProperties; +use serde::Serialize; use serde_json::Map; use serde_json::Value; @@ -63,7 +64,7 @@ pub struct DeleteBuilder { app_metadata: Option>, } -#[derive(Default, Debug)] +#[derive(Default, Debug, Serialize)] /// Metrics for the Delete Operation pub struct DeleteMetrics { /// Number of files added @@ -116,7 +117,7 @@ impl DeleteBuilder { self } - /// Writer properties passed to parquet writer for when fiiles are rewritten + /// Writer properties passed to parquet writer for when files are rewritten pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self { self.writer_properties = Some(writer_properties); self From 808790d5807f0acbbbb84215c71eb652617517df Mon Sep 17 00:00:00 2001 From: guilhem-dvr Date: Thu, 5 Oct 2023 11:58:46 +0200 Subject: [PATCH 12/16] docs(python): document the delete operation --- README.md | 26 +++++------ python/docs/source/usage.rst | 89 ++++++++++++++++++++++++------------ 2 files changed, 74 insertions(+), 41 deletions(-) diff --git a/README.md b/README.md index 0a4e3b22ee..11a39c5a07 100644 --- a/README.md +++ b/README.md @@ -138,22 +138,22 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc | S3 - R2 | ![done] | ![done] | requires lock for concurrent writes | | Azure Blob | ![done] | ![done] | | | Azure ADLS Gen2 | ![done] | ![done] | | -| Microsoft OneLake | ![done] | ![done] | | +| Microsoft OneLake | ![done] | ![done] | | | Google Cloud Storage | ![done] | ![done] | | ### Supported Operations -| Operation | Rust | Python | Description | -| --------------------- | :-----------------: | :-----------------: | ------------------------------------- | -| Create | ![done] | ![done] | Create a new table | -| Read | ![done] | ![done] | Read data from a table | -| Vacuum | ![done] | ![done] | Remove unused files and log entries | -| Delete - partitions | | ![done] | Delete a table partition | -| Delete - predicates | ![done] | | Delete data based on a predicate | -| Optimize - compaction | ![done] | ![done] | Harmonize the size of data file | -| Optimize - Z-order | ![done] | ![done] | Place similar data into the same file | -| Merge | [![semi-done]][merge-rs]| [![open]][merge-py] | Merge two tables (limited to full re-write) | -| FS check | ![done] | | Remove corrupted files from table | +| Operation | Rust | Python | Description | +| --------------------- | :----------------------: | :-----------------: | ------------------------------------------- | +| Create | ![done] | ![done] | Create a new table | +| Read | ![done] | ![done] | Read data from a table | +| Vacuum | ![done] | ![done] | Remove unused files and log entries | +| Delete - partitions | | ![done] | Delete a table partition | +| Delete - predicates | ![done] | ![done] | Delete data based on a predicate | +| Optimize - compaction | ![done] | ![done] | Harmonize the size of data file | +| Optimize - Z-order | ![done] | ![done] | Place similar data into the same file | +| Merge | [![semi-done]][merge-rs] | [![open]][merge-py] | Merge two tables (limited to full re-write) | +| FS check | ![done] | | Remove corrupted files from table | ### Protocol Support Level @@ -172,7 +172,7 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc | Reader Version | Requirement | Status | | -------------- | ----------------------------------- | ------ | -| Version 2 | Column Mapping | | +| Version 2 | Column Mapping | | | Version 3 | Table Features (requires reader V7) | | [datafusion]: https://github.com/apache/arrow-datafusion diff --git a/python/docs/source/usage.rst b/python/docs/source/usage.rst index 6711e7e777..6e7e806f4d 100644 --- a/python/docs/source/usage.rst +++ b/python/docs/source/usage.rst @@ -14,8 +14,8 @@ of the table, and other metadata such as creation time. >>> dt.version() 3 >>> dt.files() - ['part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet', - 'part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet', + ['part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet', + 'part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet', 'part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet'] @@ -54,7 +54,7 @@ being used. We try to support many of the well-known formats to identify basic s * gs:/// -Alternatively, if you have a data catalog you can load it by reference to a +Alternatively, if you have a data catalog you can load it by reference to a database and table name. Currently supported are AWS Glue and Databricks Unity Catalog. For AWS Glue catalog, use AWS environment variables to authenticate. @@ -75,7 +75,7 @@ For Databricks Unity Catalog authentication, use environment variables: * DATABRICKS_ACCESS_TOKEN .. code-block:: python - + >>> import os >>> from deltalake import DataCatalog, DeltaTable >>> os.environ['DATABRICKS_WORKSPACE_URL'] = "https://adb-62800498333851.30.azuredatabricks.net" @@ -95,7 +95,7 @@ Custom Storage Backends While delta always needs its internal storage backend to work and be properly configured, in order to manage the delta log, it may sometime be advantageous - and is common practice in the arrow world - to customize the storage interface used for -reading the bulk data. +reading the bulk data. ``deltalake`` will work with any storage compliant with :class:`pyarrow.fs.FileSystem`, however the root of the filesystem has to be adjusted to point at the root of the Delta table. We can achieve this by wrapping the custom filesystem into @@ -105,10 +105,10 @@ a :class:`pyarrow.fs.SubTreeFileSystem`. import pyarrow.fs as fs from deltalake import DeltaTable - + path = "" filesystem = fs.SubTreeFileSystem(path, fs.LocalFileSystem()) - + dt = DeltaTable(path) ds = dt.to_pyarrow_dataset(filesystem=filesystem) @@ -179,8 +179,8 @@ Schema ~~~~~~ The schema for the table is also saved in the transaction log. It can either be -retrieved in the Delta Lake form as :class:`deltalake.schema.Schema` or as a PyArrow -schema. The first allows you to introspect any column-level metadata stored in +retrieved in the Delta Lake form as :class:`deltalake.schema.Schema` or as a PyArrow +schema. The first allows you to introspect any column-level metadata stored in the schema, while the latter represents the schema the table will be loaded into. Use :meth:`DeltaTable.schema` to retrieve the delta lake schema: @@ -212,14 +212,14 @@ History ~~~~~~~ Depending on what system wrote the table, the delta table may have provenance -information describing what operations were performed on the table, when, and +information describing what operations were performed on the table, when, and by whom. This information is retained for 30 days by default, unless otherwise specified by the table configuration ``delta.logRetentionDuration``. .. note:: - This information is not written by all writers and different writers may use - different schemas to encode the actions. For Spark's format, see: + This information is not written by all writers and different writers may use + different schemas to encode the actions. For Spark's format, see: https://docs.delta.io/latest/delta-utility.html#history-schema To view the available history, use :meth:`DeltaTable.history`: @@ -239,7 +239,7 @@ To view the available history, use :meth:`DeltaTable.history`: Current Add Actions ~~~~~~~~~~~~~~~~~~~ -The active state for a delta table is determined by the Add actions, which +The active state for a delta table is determined by the Add actions, which provide the list of files that are part of the table and metadata about them, such as creation time, size, and statistics. You can get a data frame of the add actions data using :meth:`DeltaTable.get_add_actions`: @@ -268,18 +268,18 @@ Querying Delta Tables --------------------- Delta tables can be queried in several ways. By loading as Arrow data or an Arrow -dataset, they can be used by compatible engines such as Pandas and DuckDB. By +dataset, they can be used by compatible engines such as Pandas and DuckDB. By passing on the list of files, they can be loaded into other engines such as Dask. Delta tables are often larger than can fit into memory on a single computer, so -this module provides ways to read only the parts of the data you need. Partition +this module provides ways to read only the parts of the data you need. Partition filters allow you to skip reading files that are part of irrelevant partitions. Only loading the columns required also saves memory. Finally, some methods allow reading tables batch-by-batch, allowing you to process the whole table while only having a portion loaded at any given time. To load into Pandas or a PyArrow table use the :meth:`DeltaTable.to_pandas` and -:meth:`DeltaTable.to_pyarrow_table` methods, respectively. Both of these +:meth:`DeltaTable.to_pyarrow_table` methods, respectively. Both of these support filtering partitions and selecting particular columns. .. code-block:: python @@ -301,10 +301,10 @@ support filtering partitions and selecting particular columns. pyarrow.Table value: string -Converting to a PyArrow Dataset allows you to filter on columns other than +Converting to a PyArrow Dataset allows you to filter on columns other than partition columns and load the result as a stream of batches rather than a single -table. Convert to a dataset using :meth:`DeltaTable.to_pyarrow_dataset`. Filters -applied to datasets will use the partition values and file statistics from the +table. Convert to a dataset using :meth:`DeltaTable.to_pyarrow_dataset`. Filters +applied to datasets will use the partition values and file statistics from the Delta transaction log and push down any other filters to the scanning operation. .. code-block:: python @@ -366,7 +366,7 @@ you can pass them to ``dask.dataframe.read_parquet``: >>> df Dask DataFrame Structure: value year month day - npartitions=6 + npartitions=6 object category[known] category[known] category[known] ... ... ... ... ... ... ... ... ... @@ -391,7 +391,7 @@ Vacuuming tables ~~~~~~~~~~~~~~~~ Vacuuming a table will delete any files that have been marked for deletion. This -may make some past versions of a table invalid, so this can break time travel. +may make some past versions of a table invalid, so this can break time travel. However, it will save storage space. Vacuum will retain files in a certain window, by default one week, so time travel will still work in shorter ranges. @@ -406,8 +406,8 @@ only list the files to be deleted. Pass ``dry_run=False`` to actually delete fil >>> dt = DeltaTable("../rust/tests/data/simple_table") >>> dt.vacuum() - ['../rust/tests/data/simple_table/part-00006-46f2ff20-eb5d-4dda-8498-7bfb2940713b-c000.snappy.parquet', - '../rust/tests/data/simple_table/part-00190-8ac0ae67-fb1d-461d-a3d3-8dc112766ff5-c000.snappy.parquet', + ['../rust/tests/data/simple_table/part-00006-46f2ff20-eb5d-4dda-8498-7bfb2940713b-c000.snappy.parquet', + '../rust/tests/data/simple_table/part-00190-8ac0ae67-fb1d-461d-a3d3-8dc112766ff5-c000.snappy.parquet', '../rust/tests/data/simple_table/part-00164-bf40481c-4afd-4c02-befa-90f056c2d77a-c000.snappy.parquet', ...] >>> dt.vacuum(dry_run=False) # Don't run this unless you are sure! @@ -466,11 +466,11 @@ DataFrame, a PyArrow Table, or an iterator of PyArrow Record Batches. >>> write_deltalake('path/to/table', df) .. note:: - :py:func:`write_deltalake` accepts a Pandas DataFrame, but will convert it to - a Arrow table before writing. See caveats in :doc:`pyarrow:python/pandas`. + :py:func:`write_deltalake` accepts a Pandas DataFrame, but will convert it to + a Arrow table before writing. See caveats in :doc:`pyarrow:python/pandas`. -By default, writes create a new table and error if it already exists. This is -controlled by the ``mode`` parameter, which mirrors the behavior of Spark's +By default, writes create a new table and error if it already exists. This is +controlled by the ``mode`` parameter, which mirrors the behavior of Spark's :py:meth:`pyspark.sql.DataFrameWriter.saveAsTable` DataFrame method. To overwrite pass in ``mode='overwrite'`` and to append pass in ``mode='append'``: @@ -480,7 +480,7 @@ to append pass in ``mode='append'``: >>> write_deltalake('path/to/table', df, mode='append') :py:meth:`write_deltalake` will raise :py:exc:`ValueError` if the schema of -the data passed to it differs from the existing table's schema. If you wish to +the data passed to it differs from the existing table's schema. If you wish to alter the schema as part of an overwrite pass in ``overwrite_schema=True``. @@ -513,6 +513,39 @@ This method could also be used to insert a new partition if one doesn't already exist, making this operation idempotent. +Removing data +~~~~~~~~~~~~~ + +.. py:currentmodule:: deltalake.table + +You can remove rows from a table with :meth:`DeltaTable.delete`. A SQL where clause can +be provided to only remove some rows. If the clause matches some partition values, then +the files under those partition values will be removed. If the clause matches rows +inside some files, then those files will rewritten without the matched rows. Omitting +the clause will remove all files from the table. + +.. code-block:: python + + >>> from deltalake import DeltaTable, write_deltalake + >>> df = pd.DataFrame({'a': [1, 2, 3], 'to_delete': [False, False, True]}) + >>> write_deltalake('path/to/table', df) + + >>> table = DeltaTable('path/to/table') + >>> table.delete(predicate="to_delete = true") + {'num_added_files': 1, 'num_removed_files': 1, 'num_deleted_rows': 1, 'num_copied_rows': 2, 'execution_time_ms': 11081, 'scan_time_ms': 3721, 'rewrite_time_ms': 7} + + >>> table.to_pandas() + a to_delete + 0 1 False + 1 2 False + +.. note:: + + :meth:`DeltaTable.delete` does not delete files from storage but only updates the + table state to one where the deleted rows are no longer present. See + `Vacuuming tables`_ for more information. + + Restoring tables ~~~~~~~~~~~~~~~~ From eea9d2a070651196683be56b3333c82cb1fc6e4d Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 7 Oct 2023 10:37:42 -0700 Subject: [PATCH 13/16] Introduce some redundant type definitions to the mypy stub --- python/Makefile | 2 +- python/deltalake/_internal.pyi | 93 +++++++++++++++++++++++++++++++++- 2 files changed, 92 insertions(+), 3 deletions(-) diff --git a/python/Makefile b/python/Makefile index 634c154723..21cabd91a3 100644 --- a/python/Makefile +++ b/python/Makefile @@ -66,7 +66,7 @@ check-rust: ## Run check on Rust .PHONY: check-python check-python: ## Run check on Python $(info Check Python black) - black --check . + black --check --diff . $(info Check Python ruff) ruff check . $(info Check Python mypy) diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 855558e70c..945bbc8d60 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -13,7 +13,91 @@ from deltalake.writer import AddAction __version__: str -class RawDeltaTable: ... +class RawDeltaTableMetaData: + id: int + name: str + description: str + partition_columns: List[str] + created_time: int + configuration: Dict[str, str] + +class RawDeltaTable: + schema: Any + + def __init__( + self, + table_uri: str, + version: Optional[int], + storage_options: Optional[Dict[str, str]], + without_files: bool, + log_buffer_size: Optional[int], + ) -> None: ... + @staticmethod + def get_table_uri_from_data_catalog( + data_catalog: str, + database_name: str, + table_name: str, + data_catalog_id: Optional[str] = None, + catalog_options: Optional[Dict[str, str]] = None, + ) -> str: ... + def table_uri(self) -> str: ... + def version(self) -> int: ... + def metadata(self) -> RawDeltaTableMetaData: ... + def protocol_versions(self) -> List[int]: ... + def load_version(self, version: int) -> None: ... + def load_with_datetime(self, ds: str) -> None: ... + def files_by_partitions( + self, partitions_filters: Optional[FilterType] + ) -> List[str]: ... + def files(self, partition_filters: Optional[FilterType]) -> List[str]: ... + def file_uris(self, partition_filters: Optional[FilterType]) -> List[str]: ... + def vacuum( + self, + dry_run: bool, + retention_hours: Optional[int], + enforce_retention_duration: bool, + ) -> List[str]: ... + def compact_optimize( + self, + partition_filters: Optional[FilterType], + target_size: Optional[int], + max_concurrent_tasks: Optional[int], + min_commit_interval: Optional[int], + ) -> str: ... + def z_order_optimize( + self, + z_order_columns: List[str], + partition_filters: Optional[FilterType], + target_size: Optional[int], + max_concurrent_tasks: Optional[int], + max_spill_size: Optional[int], + min_commit_interval: Optional[int], + ) -> str: ... + def restore( + self, + target: Optional[Any], + ignore_missing_files: bool, + protocol_downgrade_allowed: bool, + ) -> str: ... + def history(self, limit: Optional[int]) -> List[str]: ... + def update_incremental(self) -> None: ... + def dataset_partitions( + self, schema: pa.Schema, partition_filters: Optional[FilterType] + ) -> List[Any]: ... + def create_checkpoint(self) -> None: ... + def get_add_actions(self, flatten: bool) -> pa.RecordBatch: ... + def delete(self, predicate: Optional[str]) -> str: ... + def get_active_partitions( + self, partitions_filters: Optional[FilterType] = None + ) -> Any: ... + def create_write_transaction( + self, + add_actions: List[AddAction], + mode: str, + partition_by: List[str], + schema: pa.Schema, + partitions_filters: Optional[FilterType], + ) -> None: ... def rust_core_version() -> str: ... def write_new_deltalake( @@ -26,7 +110,7 @@ def write_new_deltalake( description: Optional[str], configuration: Optional[Mapping[str, Optional[str]]], storage_options: Optional[Dict[str, str]], -): ... +) -> None: ... def batch_distinct(batch: pa.RecordBatch) -> pa.RecordBatch: ... # Can't implement inheritance (see note in src/schema.rs), so this is next @@ -241,3 +325,8 @@ class DeltaProtocolError(DeltaError): """Raised when a violation with the Delta protocol specs ocurred.""" pass + +FilterLiteralType = Tuple[str, str, Any] +FilterConjunctionType = List[FilterLiteralType] +FilterDNFType = List[FilterConjunctionType] +FilterType = Union[FilterConjunctionType, FilterDNFType] From 66ad28f12ecd7c1de807829a1102ebb986696284 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 7 Oct 2023 10:57:50 -0700 Subject: [PATCH 14/16] chore: fix new clippy lints introduced in Rust 1.73 --- rust/src/operations/optimize.rs | 5 +---- rust/src/protocol/mod.rs | 7 ++++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/rust/src/operations/optimize.rs b/rust/src/operations/optimize.rs index eafc768519..d4d9080614 100644 --- a/rust/src/operations/optimize.rs +++ b/rust/src/operations/optimize.rs @@ -876,10 +876,7 @@ fn build_compaction_plan( let part = PartitionTuples::from_hashmap(partition_keys, &add.partition_values); - partition_files - .entry(part) - .or_insert_with(Vec::new) - .push(object_meta); + partition_files.entry(part).or_default().push(object_meta); } for file in partition_files.values_mut() { diff --git a/rust/src/protocol/mod.rs b/rust/src/protocol/mod.rs index b03ebe4a9a..424b83284e 100644 --- a/rust/src/protocol/mod.rs +++ b/rust/src/protocol/mod.rs @@ -836,9 +836,10 @@ impl DeltaOperation { pub fn name(&self) -> &str { // operation names taken from https://learn.microsoft.com/en-us/azure/databricks/delta/history#--operation-metrics-keys match &self { - DeltaOperation::Create { mode, .. } if matches!(mode, SaveMode::Overwrite) => { - "CREATE OR REPLACE TABLE" - } + DeltaOperation::Create { + mode: SaveMode::Overwrite, + .. + } => "CREATE OR REPLACE TABLE", DeltaOperation::Create { .. } => "CREATE TABLE", DeltaOperation::Write { .. } => "WRITE", DeltaOperation::Delete { .. } => "DELETE", From e8ae4937a67d39a799aec3a8a73a21db6a202982 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 7 Oct 2023 12:40:49 -0700 Subject: [PATCH 15/16] Update the sphinx ignore for building =_= --- python/docs/source/conf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/docs/source/conf.py b/python/docs/source/conf.py index c11b808659..cea081579d 100644 --- a/python/docs/source/conf.py +++ b/python/docs/source/conf.py @@ -61,7 +61,7 @@ def get_release_version() -> str: ("py:class", "pyarrow._fs.FileInfo"), ("py:class", "pyarrow._fs.FileSelector"), ("py:class", "pyarrow._fs.FileSystemHandler"), - ("py:class", "RawDeltaTable"), + ("py:class", "deltalake._internal.RawDeltaTable"), ("py:class", "pandas.DataFrame"), ("py:class", "pyarrow._dataset_parquet.ParquetFileWriteOptions"), ("py:class", "pathlib.Path"), From 94b41b78a48afb4b4a3a21247d963b343edcebcc Mon Sep 17 00:00:00 2001 From: Ion Koutsouris Date: Sun, 8 Oct 2023 23:23:31 +0200 Subject: [PATCH 16/16] Enable prebuffer --- python/deltalake/table.py | 6 +++++- python/stubs/pyarrow/dataset.pyi | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 72912d696d..ebebc685f3 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -24,6 +24,7 @@ Expression, FileSystemDataset, ParquetFileFormat, + ParquetFragmentScanOptions, ParquetReadOptions, ) @@ -538,7 +539,10 @@ def to_pyarrow_dataset( ) ) - format = ParquetFileFormat(read_options=parquet_read_options) + format = ParquetFileFormat( + read_options=parquet_read_options, + default_fragment_scan_options=ParquetFragmentScanOptions(pre_buffer=True), + ) fragments = [ format.make_fragment( diff --git a/python/stubs/pyarrow/dataset.pyi b/python/stubs/pyarrow/dataset.pyi index af3c952688..2bb9f306f0 100644 --- a/python/stubs/pyarrow/dataset.pyi +++ b/python/stubs/pyarrow/dataset.pyi @@ -6,6 +6,7 @@ Expression: Any field: Any partitioning: Any FileSystemDataset: Any +ParquetFragmentScanOptions: Any ParquetFileFormat: Any ParquetReadOptions: Any ParquetFileWriteOptions: Any