Skip to content

Commit

Permalink
feat(python): expose convert_to_deltalake (#1842)
Browse files Browse the repository at this point in the history
# Description
Exposes added `convert to delta` functionality by @junjunjd to Python
API.


# Related Issue(s)
- closes #1767

---------

Co-authored-by: Robert Pack <42610831+roeap@users.noreply.github.com>
  • Loading branch information
ion-elgreco and roeap authored Nov 19, 2023
1 parent dd6b453 commit e48b8a7
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 13 deletions.
16 changes: 15 additions & 1 deletion crates/deltalake-core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use serde_json::{Map, Value};
use std::{
collections::{HashMap, HashSet},
num::TryFromIntError,
str::Utf8Error,
str::{FromStr, Utf8Error},
sync::Arc,
};

Expand Down Expand Up @@ -82,6 +82,20 @@ pub enum PartitionStrategy {
Hive,
}

impl FromStr for PartitionStrategy {
type Err = DeltaTableError;

fn from_str(s: &str) -> DeltaResult<Self> {
match s.to_ascii_lowercase().as_str() {
"hive" => Ok(PartitionStrategy::Hive),
_ => Err(DeltaTableError::Generic(format!(
"Invalid partition strategy provided {}",
s
))),
}
}
}

/// Build an operation to convert a Parquet table to a [`DeltaTable`] in place
pub struct ConvertToDeltaBuilder {
log_store: Option<LogStoreRef>,
Expand Down
20 changes: 19 additions & 1 deletion crates/deltalake-core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use std::borrow::Borrow;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::mem::take;
use std::str::FromStr;

use crate::errors::DeltaResult;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove};
use crate::logstore::LogStore;
use crate::table::CheckPoint;
Expand Down Expand Up @@ -589,6 +590,23 @@ pub enum SaveMode {
Ignore,
}

impl FromStr for SaveMode {
type Err = DeltaTableError;

fn from_str(s: &str) -> DeltaResult<Self> {
match s.to_ascii_lowercase().as_str() {
"append" => Ok(SaveMode::Append),
"overwrite" => Ok(SaveMode::Overwrite),
"error" => Ok(SaveMode::ErrorIfExists),
"ignore" => Ok(SaveMode::Ignore),
_ => Err(DeltaTableError::Generic(format!(
"Invalid save mode provided: {}, only these are supported: ['append', 'overwrite', 'error', 'ignore']",
s
))),
}
}
}

/// The OutputMode used in streaming operations.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum OutputMode {
Expand Down
1 change: 1 addition & 0 deletions python/deltalake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
from .schema import Schema as Schema
from .table import DeltaTable as DeltaTable
from .table import Metadata as Metadata
from .writer import convert_to_deltalake as convert_to_deltalake
from .writer import write_deltalake as write_deltalake
10 changes: 10 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,16 @@ def write_new_deltalake(
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
) -> None: ...
def convert_to_deltalake(
uri: str,
partition_by: Optional[pyarrow.Schema],
partition_strategy: Optional[Literal["hive"]],
name: Optional[str],
description: Optional[str],
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
custom_metadata: Optional[Dict[str, str]],
) -> None: ...
def batch_distinct(batch: pyarrow.RecordBatch) -> pyarrow.RecordBatch: ...

# Can't implement inheritance (see note in src/schema.rs), so this is next
Expand Down
55 changes: 55 additions & 0 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

from ._internal import DeltaDataChecker as _DeltaDataChecker
from ._internal import batch_distinct
from ._internal import convert_to_deltalake as _convert_to_deltalake
from ._internal import write_new_deltalake as _write_new_deltalake
from .exceptions import DeltaProtocolError, TableNotFoundError
from .table import MAX_SUPPORTED_WRITER_VERSION, DeltaTable
Expand Down Expand Up @@ -391,6 +392,60 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
table.update_incremental()


def convert_to_deltalake(
uri: Union[str, Path],
mode: Literal["error", "ignore"] = "error",
partition_by: Optional[pa.Schema] = None,
partition_strategy: Optional[Literal["hive"]] = None,
name: Optional[str] = None,
description: Optional[str] = None,
configuration: Optional[Mapping[str, Optional[str]]] = None,
storage_options: Optional[Dict[str, str]] = None,
custom_metadata: Optional[Dict[str, str]] = None,
) -> None:
"""
`Convert` parquet tables `to delta` tables.
Currently only HIVE partitioned tables are supported. `Convert to delta` creates
a transaction log commit with add actions, and additional properties provided such
as configuration, name, and description.
Args:
uri: URI of a table.
partition_by: Optional partitioning schema if table is partitioned.
partition_strategy: Optional partition strategy to read and convert
mode: How to handle existing data. Default is to error if table already exists.
If 'ignore', will not convert anything if table already exists.
name: User-provided identifier for this table.
description: User-provided description for this table.
configuration: A map containing configuration options for the metadata action.
storage_options: options passed to the native delta filesystem. Unused if 'filesystem' is defined.
custom_metadata: custom metadata that will be added to the transaction commit
"""
if partition_by is not None and partition_strategy is None:
raise ValueError("Partition strategy has to be provided with partition_by.")

if partition_strategy is not None and partition_strategy != "hive":
raise ValueError(
"Currently only `hive` partition strategy is supported to be converted."
)

if mode == "ignore" and try_get_deltatable(uri, storage_options) is not None:
return

_convert_to_deltalake(
str(uri),
partition_by,
partition_strategy,
name,
description,
configuration,
storage_options,
custom_metadata,
)
return


def __enforce_append_only(
table: Optional[DeltaTable],
configuration: Optional[Mapping[str, Optional[str]]],
Expand Down
68 changes: 57 additions & 11 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::errors::DeltaTableError;
use deltalake::kernel::{Action, Add, Invariant, Metadata, Remove, StructType};
use deltalake::operations::convert_to_delta::{ConvertToDeltaBuilder, PartitionStrategy};
use deltalake::operations::delete::DeleteBuilder;
use deltalake::operations::filesystem_check::FileSystemCheckBuilder;
use deltalake::operations::merge::MergeBuilder;
Expand All @@ -43,6 +44,7 @@ use deltalake::DeltaTableBuilder;
use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{PyFrozenSet, PyType};
use serde_json::{Map, Value};

use crate::error::DeltaProtocolError;
use crate::error::PythonError;
Expand Down Expand Up @@ -758,7 +760,8 @@ impl RawDeltaTable {
schema: PyArrowType<ArrowSchema>,
partitions_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
) -> PyResult<()> {
let mode = save_mode_from_str(mode)?;
let mode = mode.parse().map_err(PythonError::from)?;

let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?;

let existing_schema = self._table.get_schema().map_err(PythonError::from)?;
Expand Down Expand Up @@ -1088,16 +1091,6 @@ fn batch_distinct(batch: PyArrowType<RecordBatch>) -> PyResult<PyArrowType<Recor
))
}

fn save_mode_from_str(value: &str) -> PyResult<SaveMode> {
match value {
"append" => Ok(SaveMode::Append),
"overwrite" => Ok(SaveMode::Overwrite),
"error" => Ok(SaveMode::ErrorIfExists),
"ignore" => Ok(SaveMode::Ignore),
_ => Err(PyValueError::new_err("Invalid save mode")),
}
}

fn current_timestamp() -> i64 {
let start = SystemTime::now();
let since_the_epoch = start
Expand Down Expand Up @@ -1180,6 +1173,58 @@ fn write_new_deltalake(
Ok(())
}

#[pyfunction]
#[allow(clippy::too_many_arguments)]
fn convert_to_deltalake(
uri: String,
partition_schema: Option<PyArrowType<ArrowSchema>>,
partition_strategy: Option<String>,
name: Option<String>,
description: Option<String>,
configuration: Option<HashMap<String, Option<String>>>,
storage_options: Option<HashMap<String, String>>,
custom_metadata: Option<HashMap<String, String>>,
) -> PyResult<()> {
let mut builder = ConvertToDeltaBuilder::new().with_location(uri);

if let Some(part_schema) = partition_schema {
let schema: StructType = (&part_schema.0).try_into().map_err(PythonError::from)?;
builder = builder.with_partition_schema(schema.fields().clone());
}

if let Some(partition_strategy) = &partition_strategy {
let strategy: PartitionStrategy = partition_strategy.parse().map_err(PythonError::from)?;
builder = builder.with_partition_strategy(strategy);
}

if let Some(name) = &name {
builder = builder.with_table_name(name);
}

if let Some(description) = &description {
builder = builder.with_comment(description);
}

if let Some(config) = configuration {
builder = builder.with_configuration(config);
};

if let Some(strg_options) = storage_options {
builder = builder.with_storage_options(strg_options);
};

if let Some(metadata) = custom_metadata {
let json_metadata: Map<String, Value> =
metadata.into_iter().map(|(k, v)| (k, v.into())).collect();
builder = builder.with_metadata(json_metadata);
};

rt()?
.block_on(builder.into_future())
.map_err(PythonError::from)?;
Ok(())
}

#[pyclass(name = "DeltaDataChecker", module = "deltalake._internal")]
struct PyDeltaDataChecker {
inner: DeltaDataChecker,
Expand Down Expand Up @@ -1225,6 +1270,7 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> {
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
m.add_function(pyo3::wrap_pyfunction!(rust_core_version, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(write_new_deltalake, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(convert_to_deltalake, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(batch_distinct, m)?)?;
m.add_class::<RawDeltaTable>()?;
m.add_class::<RawDeltaTableMetaData>()?;
Expand Down
97 changes: 97 additions & 0 deletions python/tests/test_convert_to_delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import pathlib

import pyarrow as pa
import pyarrow.dataset as ds
import pytest

from deltalake import convert_to_deltalake
from deltalake.exceptions import DeltaError
from deltalake.table import DeltaTable


def test_local_convert_to_delta(tmp_path: pathlib.Path, sample_data: pa.Table):
ds.write_dataset(
sample_data,
tmp_path,
format="parquet",
existing_data_behavior="overwrite_or_ignore",
)

name = "converted_table"
description = "parquet table converted to delta table with delta-rs"
convert_to_deltalake(
tmp_path,
name=name,
description=description,
configuration={"delta.AppendOnly": "True"},
)

dt = DeltaTable(tmp_path)

assert dt.version() == 0
assert dt.files() == ["part-0.parquet"]
assert dt.metadata().name == name
assert dt.metadata().description == description
assert dt.metadata().configuration == {"delta.AppendOnly": "True"}


def test_convert_delta_write_modes(tmp_path: pathlib.Path, sample_data: pa.Table):
ds.write_dataset(
sample_data,
tmp_path,
format="parquet",
existing_data_behavior="overwrite_or_ignore",
)

convert_to_deltalake(
tmp_path,
)

with pytest.raises(DeltaError):
convert_to_deltalake(
tmp_path,
)

convert_to_deltalake(tmp_path, mode="ignore")


def test_convert_delta_with_partitioning(tmp_path: pathlib.Path, sample_data: pa.Table):
ds.write_dataset(
sample_data,
tmp_path,
format="parquet",
existing_data_behavior="overwrite_or_ignore",
partitioning=["utf8"],
partitioning_flavor="hive",
)

with pytest.raises(
DeltaError,
match="Generic error: The schema of partition columns must be provided to convert a Parquet table to a Delta table",
):
convert_to_deltalake(
tmp_path,
)
with pytest.raises(
ValueError, match="Partition strategy has to be provided with partition_by"
):
convert_to_deltalake(
tmp_path,
partition_by=pa.schema([pa.field("utf8", pa.string())]),
)

with pytest.raises(
ValueError,
match="Currently only `hive` partition strategy is supported to be converted.",
):
convert_to_deltalake(
tmp_path,
partition_by=pa.schema([pa.field("utf8", pa.string())]),
partition_strategy="directory",
)

convert_to_deltalake(
tmp_path,
partition_by=pa.schema([pa.field("utf8", pa.string())]),
partition_strategy="hive",
)

0 comments on commit e48b8a7

Please sign in to comment.