Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): expose convert_to_deltalake #1842

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion crates/deltalake-core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use serde_json::{Map, Value};
use std::{
collections::{HashMap, HashSet},
num::TryFromIntError,
str::Utf8Error,
str::{FromStr, Utf8Error},
sync::Arc,
};

Expand Down Expand Up @@ -82,6 +82,20 @@ pub enum PartitionStrategy {
Hive,
}

impl FromStr for PartitionStrategy {
type Err = DeltaTableError;

fn from_str(s: &str) -> DeltaResult<Self> {
match s.to_ascii_lowercase().as_str() {
"hive" => Ok(PartitionStrategy::Hive),
_ => Err(DeltaTableError::Generic(format!(
"Invalid partition strategy provided {}",
s
))),
}
}
}

/// Build an operation to convert a Parquet table to a [`DeltaTable`] in place
pub struct ConvertToDeltaBuilder {
log_store: Option<LogStoreRef>,
Expand Down
20 changes: 19 additions & 1 deletion crates/deltalake-core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use std::borrow::Borrow;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::mem::take;
use std::str::FromStr;

use crate::errors::DeltaResult;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove};
use crate::logstore::LogStore;
use crate::table::CheckPoint;
Expand Down Expand Up @@ -589,6 +590,23 @@ pub enum SaveMode {
Ignore,
}

impl FromStr for SaveMode {
type Err = DeltaTableError;

fn from_str(s: &str) -> DeltaResult<Self> {
match s.to_ascii_lowercase().as_str() {
"append" => Ok(SaveMode::Append),
"overwrite" => Ok(SaveMode::Overwrite),
"error" => Ok(SaveMode::ErrorIfExists),
"ignore" => Ok(SaveMode::Ignore),
_ => Err(DeltaTableError::Generic(format!(
"Invalid save mode provided: {}, only these are supported: ['append', 'overwrite', 'error', 'ignore']",
s
))),
}
}
}

/// The OutputMode used in streaming operations.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum OutputMode {
Expand Down
1 change: 1 addition & 0 deletions python/deltalake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
from .schema import Schema as Schema
from .table import DeltaTable as DeltaTable
from .table import Metadata as Metadata
from .writer import convert_to_deltalake as convert_to_deltalake
from .writer import write_deltalake as write_deltalake
10 changes: 10 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,16 @@ def write_new_deltalake(
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
) -> None: ...
def convert_to_deltalake(
uri: str,
partition_by: Optional[pyarrow.Schema],
partition_strategy: Optional[Literal["hive"]],
name: Optional[str],
description: Optional[str],
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
custom_metadata: Optional[Dict[str, str]],
) -> None: ...
def batch_distinct(batch: pyarrow.RecordBatch) -> pyarrow.RecordBatch: ...

# Can't implement inheritance (see note in src/schema.rs), so this is next
Expand Down
55 changes: 55 additions & 0 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

from ._internal import DeltaDataChecker as _DeltaDataChecker
from ._internal import batch_distinct
from ._internal import convert_to_deltalake as _convert_to_deltalake
from ._internal import write_new_deltalake as _write_new_deltalake
from .exceptions import DeltaProtocolError, TableNotFoundError
from .table import MAX_SUPPORTED_WRITER_VERSION, DeltaTable
Expand Down Expand Up @@ -391,6 +392,60 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
table.update_incremental()


def convert_to_deltalake(
uri: Union[str, Path],
mode: Literal["error", "ignore"] = "error",
partition_by: Optional[pa.Schema] = None,
partition_strategy: Optional[Literal["hive"]] = None,
name: Optional[str] = None,
description: Optional[str] = None,
configuration: Optional[Mapping[str, Optional[str]]] = None,
storage_options: Optional[Dict[str, str]] = None,
custom_metadata: Optional[Dict[str, str]] = None,
) -> None:
"""
`Convert` parquet tables `to delta` tables.

Currently only HIVE partitioned tables are supported. `Convert to delta` creates
a transaction log commit with add actions, and additional properties provided such
as configuration, name, and description.

Args:
uri: URI of a table.
partition_by: Optional partitioning schema if table is partitioned.
partition_strategy: Optional partition strategy to read and convert
mode: How to handle existing data. Default is to error if table already exists.
If 'ignore', will not convert anything if table already exists.
name: User-provided identifier for this table.
description: User-provided description for this table.
configuration: A map containing configuration options for the metadata action.
storage_options: options passed to the native delta filesystem. Unused if 'filesystem' is defined.
custom_metadata: custom metadata that will be added to the transaction commit
"""
if partition_by is not None and partition_strategy is None:
raise ValueError("Partition strategy has to be provided with partition_by.")

if partition_strategy is not None and partition_strategy != "hive":
raise ValueError(
"Currently only `hive` partition strategy is supported to be converted."
)

if mode == "ignore" and try_get_deltatable(uri, storage_options) is not None:
return

_convert_to_deltalake(
str(uri),
partition_by,
partition_strategy,
name,
description,
configuration,
storage_options,
custom_metadata,
)
return


def __enforce_append_only(
table: Optional[DeltaTable],
configuration: Optional[Mapping[str, Optional[str]]],
Expand Down
68 changes: 57 additions & 11 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::errors::DeltaTableError;
use deltalake::kernel::{Action, Add, Invariant, Metadata, Remove, StructType};
use deltalake::operations::convert_to_delta::{ConvertToDeltaBuilder, PartitionStrategy};
use deltalake::operations::delete::DeleteBuilder;
use deltalake::operations::filesystem_check::FileSystemCheckBuilder;
use deltalake::operations::merge::MergeBuilder;
Expand All @@ -43,6 +44,7 @@ use deltalake::DeltaTableBuilder;
use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{PyFrozenSet, PyType};
use serde_json::{Map, Value};

use crate::error::DeltaProtocolError;
use crate::error::PythonError;
Expand Down Expand Up @@ -758,7 +760,8 @@ impl RawDeltaTable {
schema: PyArrowType<ArrowSchema>,
partitions_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
) -> PyResult<()> {
let mode = save_mode_from_str(mode)?;
let mode = mode.parse().map_err(PythonError::from)?;

let schema: StructType = (&schema.0).try_into().map_err(PythonError::from)?;

let existing_schema = self._table.get_schema().map_err(PythonError::from)?;
Expand Down Expand Up @@ -1088,16 +1091,6 @@ fn batch_distinct(batch: PyArrowType<RecordBatch>) -> PyResult<PyArrowType<Recor
))
}

fn save_mode_from_str(value: &str) -> PyResult<SaveMode> {
match value {
"append" => Ok(SaveMode::Append),
"overwrite" => Ok(SaveMode::Overwrite),
"error" => Ok(SaveMode::ErrorIfExists),
"ignore" => Ok(SaveMode::Ignore),
_ => Err(PyValueError::new_err("Invalid save mode")),
}
}

fn current_timestamp() -> i64 {
let start = SystemTime::now();
let since_the_epoch = start
Expand Down Expand Up @@ -1180,6 +1173,58 @@ fn write_new_deltalake(
Ok(())
}

#[pyfunction]
#[allow(clippy::too_many_arguments)]
fn convert_to_deltalake(
uri: String,
partition_schema: Option<PyArrowType<ArrowSchema>>,
partition_strategy: Option<String>,
name: Option<String>,
description: Option<String>,
configuration: Option<HashMap<String, Option<String>>>,
storage_options: Option<HashMap<String, String>>,
custom_metadata: Option<HashMap<String, String>>,
) -> PyResult<()> {
let mut builder = ConvertToDeltaBuilder::new().with_location(uri);

if let Some(part_schema) = partition_schema {
let schema: StructType = (&part_schema.0).try_into().map_err(PythonError::from)?;
builder = builder.with_partition_schema(schema.fields().clone());
}

if let Some(partition_strategy) = &partition_strategy {
let strategy: PartitionStrategy = partition_strategy.parse().map_err(PythonError::from)?;
builder = builder.with_partition_strategy(strategy);
}

if let Some(name) = &name {
builder = builder.with_table_name(name);
}

if let Some(description) = &description {
builder = builder.with_comment(description);
}

if let Some(config) = configuration {
builder = builder.with_configuration(config);
};

if let Some(strg_options) = storage_options {
builder = builder.with_storage_options(strg_options);
};

if let Some(metadata) = custom_metadata {
let json_metadata: Map<String, Value> =
metadata.into_iter().map(|(k, v)| (k, v.into())).collect();
builder = builder.with_metadata(json_metadata);
};

rt()?
.block_on(builder.into_future())
.map_err(PythonError::from)?;
Ok(())
}

#[pyclass(name = "DeltaDataChecker", module = "deltalake._internal")]
struct PyDeltaDataChecker {
inner: DeltaDataChecker,
Expand Down Expand Up @@ -1225,6 +1270,7 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> {
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
m.add_function(pyo3::wrap_pyfunction!(rust_core_version, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(write_new_deltalake, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(convert_to_deltalake, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(batch_distinct, m)?)?;
m.add_class::<RawDeltaTable>()?;
m.add_class::<RawDeltaTableMetaData>()?;
Expand Down
97 changes: 97 additions & 0 deletions python/tests/test_convert_to_delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import pathlib

import pyarrow as pa
import pyarrow.dataset as ds
import pytest

from deltalake import convert_to_deltalake
from deltalake.exceptions import DeltaError
from deltalake.table import DeltaTable


def test_local_convert_to_delta(tmp_path: pathlib.Path, sample_data: pa.Table):
ds.write_dataset(
sample_data,
tmp_path,
format="parquet",
existing_data_behavior="overwrite_or_ignore",
)

name = "converted_table"
description = "parquet table converted to delta table with delta-rs"
convert_to_deltalake(
tmp_path,
name=name,
description=description,
configuration={"delta.AppendOnly": "True"},
)

dt = DeltaTable(tmp_path)

assert dt.version() == 0
assert dt.files() == ["part-0.parquet"]
assert dt.metadata().name == name
assert dt.metadata().description == description
assert dt.metadata().configuration == {"delta.AppendOnly": "True"}


def test_convert_delta_write_modes(tmp_path: pathlib.Path, sample_data: pa.Table):
ds.write_dataset(
sample_data,
tmp_path,
format="parquet",
existing_data_behavior="overwrite_or_ignore",
)

convert_to_deltalake(
tmp_path,
)

with pytest.raises(DeltaError):
convert_to_deltalake(
tmp_path,
)

convert_to_deltalake(tmp_path, mode="ignore")


def test_convert_delta_with_partitioning(tmp_path: pathlib.Path, sample_data: pa.Table):
ds.write_dataset(
sample_data,
tmp_path,
format="parquet",
existing_data_behavior="overwrite_or_ignore",
partitioning=["utf8"],
partitioning_flavor="hive",
)

with pytest.raises(
DeltaError,
match="Generic error: The schema of partition columns must be provided to convert a Parquet table to a Delta table",
):
convert_to_deltalake(
tmp_path,
)
with pytest.raises(
ValueError, match="Partition strategy has to be provided with partition_by"
):
convert_to_deltalake(
tmp_path,
partition_by=pa.schema([pa.field("utf8", pa.string())]),
)

with pytest.raises(
ValueError,
match="Currently only `hive` partition strategy is supported to be converted.",
):
convert_to_deltalake(
tmp_path,
partition_by=pa.schema([pa.field("utf8", pa.string())]),
partition_strategy="directory",
)

convert_to_deltalake(
tmp_path,
partition_by=pa.schema([pa.field("utf8", pa.string())]),
partition_strategy="hive",
)
Loading