Skip to content

Commit

Permalink
refactor: move checkpoint and errors into separate module (#1430)
Browse files Browse the repository at this point in the history
# Description

While doing #1409 it became evident, that our errors are somewhat
organically grown and could do with some pruning. At the same time we
are hoping to reorganize (#1136) delta-rs a bit, to make it easier to
reason about.

This PR is a first attempt to introduce a more explicit approach to how
we model our errors. Here I'd like to propose we work towards the
approach taken in the object store crate - specifically have very
specific errors in submodules, but do not surface those errors to the
user. Rather collapse them into eventually a single error type.

Since delta-rs does much more things then object store, I believe we
will eventually end up with some more top level errors, or have maybe a
two level hierarch.

As a first step in this PR:
- move `checkpoints.rs` into actions module (as proposed in #1136)
- move top level errors into new `errors` module - at lest the ones
defined in `delta.rs`
- try and consolidate `ActionError` and `CheckpointError` which are now
part of the `action` module.

As I needed to touch a bunch of imports anyhow, I took the liberty to
organize them according to what to the best of my knowledge is the
[leading
contender](https://rust-lang.github.io/rustfmt/?version=v1.4.38&search=#StdExternalCrate%5C%3A)
for what rust-fmt might do.

@wjones127 @rtyler @Blajda - opening this as a draft to get some
feedback, as this is a somewhat intrusive change to the overall crate,
in case you want to go another way.

# Related Issue(s)

part of: #1136

# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
roeap authored Jun 3, 2023
1 parent 1a6064e commit 98af4dc
Show file tree
Hide file tree
Showing 47 changed files with 702 additions and 618 deletions.
26 changes: 13 additions & 13 deletions python/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use arrow_schema::ArrowError;
use deltalake::checkpoints::CheckpointError;
use deltalake::{DeltaTableError, ObjectStoreError};
use deltalake::action::ProtocolError;
use deltalake::{errors::DeltaTableError, ObjectStoreError};
use pyo3::exceptions::{
PyException, PyFileNotFoundError, PyIOError, PyNotImplementedError, PyValueError,
};
Expand Down Expand Up @@ -59,16 +59,16 @@ fn arrow_to_py(err: ArrowError) -> PyErr {
}
}

fn checkpoint_to_py(err: CheckpointError) -> PyErr {
fn checkpoint_to_py(err: ProtocolError) -> PyErr {
match err {
CheckpointError::Io { source } => PyIOError::new_err(source.to_string()),
CheckpointError::Arrow { source } => arrow_to_py(source),
CheckpointError::DeltaTable { source } => inner_to_py_err(source),
CheckpointError::ObjectStore { source } => object_store_to_py(source),
CheckpointError::MissingMetaData => DeltaProtocolError::new_err("Table metadata missing"),
CheckpointError::PartitionValueNotParseable(err) => PyValueError::new_err(err),
CheckpointError::JSONSerialization { source } => PyValueError::new_err(source.to_string()),
CheckpointError::Parquet { source } => PyIOError::new_err(source.to_string()),
ProtocolError::Arrow { source } => arrow_to_py(source),
ProtocolError::ObjectStore { source } => object_store_to_py(source),
ProtocolError::NoMetaData => DeltaProtocolError::new_err("Table metadata missing"),
ProtocolError::InvalidField(err) => PyValueError::new_err(err),
ProtocolError::InvalidRow(err) => PyValueError::new_err(err),
ProtocolError::SerializeOperation { source } => PyValueError::new_err(source.to_string()),
ProtocolError::ParquetParseError { source } => PyIOError::new_err(source.to_string()),
ProtocolError::Generic(msg) => DeltaError::new_err(msg),
}
}

Expand All @@ -81,7 +81,7 @@ pub enum PythonError {
#[error("Error in arrow")]
Arrow(#[from] ArrowError),
#[error("Error in checkpoint")]
Checkpoint(#[from] CheckpointError),
Protocol(#[from] ProtocolError),
}

impl From<PythonError> for pyo3::PyErr {
Expand All @@ -90,7 +90,7 @@ impl From<PythonError> for pyo3::PyErr {
PythonError::DeltaTable(err) => inner_to_py_err(err),
PythonError::ObjectStore(err) => object_store_to_py(err),
PythonError::Arrow(err) => arrow_to_py(err),
PythonError::Checkpoint(err) => checkpoint_to_py(err),
PythonError::Protocol(err) => checkpoint_to_py(err),
}
}
}
5 changes: 3 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use deltalake::builder::DeltaTableBuilder;
use deltalake::checkpoints::create_checkpoint;
use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::errors::DeltaTableError;
use deltalake::operations::optimize::OptimizeBuilder;
use deltalake::operations::transaction::commit;
use deltalake::operations::vacuum::VacuumBuilder;
Expand Down Expand Up @@ -174,7 +175,7 @@ impl RawDeltaTable {
&self,
partitions_filters: Vec<(&str, &str, PartitionFilterValue)>,
) -> PyResult<Vec<String>> {
let partition_filters: Result<Vec<PartitionFilter<&str>>, deltalake::DeltaTableError> =
let partition_filters: Result<Vec<PartitionFilter<&str>>, DeltaTableError> =
partitions_filters
.into_iter()
.map(|filter| match filter {
Expand Down Expand Up @@ -520,7 +521,7 @@ impl RawDeltaTable {

fn convert_partition_filters<'a>(
partitions_filters: Vec<(&'a str, &'a str, PartitionFilterValue<'a>)>,
) -> Result<Vec<PartitionFilter<&'a str>>, deltalake::DeltaTableError> {
) -> Result<Vec<PartitionFilter<&'a str>>, DeltaTableError> {
partitions_filters
.into_iter()
.map(|filter| match filter {
Expand Down
2 changes: 1 addition & 1 deletion rust/examples/basic_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn get_table_batches() -> RecordBatch {
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), deltalake::DeltaTableError> {
async fn main() -> Result<(), deltalake::errors::DeltaTableError> {
// Create a delta operations client pointing at an un-initialized in-memory location.
// In a production environment this would be created with "try_new" and point at
// a real storage location.
Expand Down
2 changes: 1 addition & 1 deletion rust/examples/read_delta_table.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), deltalake::DeltaTableError> {
async fn main() -> Result<(), deltalake::errors::DeltaTableError> {
let table_path = "./tests/data/delta-0.8.0";
let table = deltalake::open_table(table_path).await?;
println!("{table}");
Expand Down
1 change: 1 addition & 0 deletions rust/examples/recordbatch-writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use chrono::prelude::*;
use deltalake::arrow::array::*;
use deltalake::arrow::record_batch::RecordBatch;
use deltalake::errors::DeltaTableError;
use deltalake::writer::{DeltaWriter, RecordBatchWriter};
use deltalake::*;
use log::*;
Expand Down
123 changes: 47 additions & 76 deletions rust/src/checkpoints.rs → rust/src/action/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use chrono::{DateTime, Datelike, Duration, Utc};
use futures::StreamExt;
use lazy_static::lazy_static;
use log::*;
use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore};
use object_store::{path::Path, ObjectMeta, ObjectStore};
use parquet::arrow::ArrowWriter;
use parquet::errors::ParquetError;
use regex::Regex;
Expand All @@ -17,89 +17,62 @@ use std::convert::TryFrom;
use std::iter::Iterator;
use std::ops::Add;

use super::action;
use super::delta_arrow::delta_log_schema_for_table;
use super::open_table_with_version;
use super::schema::*;
use super::storage::DeltaObjectStore;
use super::table_state::DeltaTableState;
use super::time_utils;
use super::DeltaTable;
use super::{CheckPoint, DeltaTableError};
use super::{Action, Add as AddAction, MetaData, Protocol, ProtocolError, Txn};
use crate::delta_arrow::delta_log_schema_for_table;
use crate::schema::*;
use crate::storage::DeltaObjectStore;
use crate::table_state::DeltaTableState;
use crate::{open_table_with_version, time_utils, CheckPoint, DeltaTable};

type SchemaPath = Vec<String>;

/// Error returned when there is an error during creating a checkpoint.
#[derive(thiserror::Error, Debug)]
pub enum CheckpointError {
/// Error returned when the DeltaTableState does not contain a metadata action.
#[error("DeltaTableMetadata not present in DeltaTableState")]
MissingMetaData,
enum CheckpointError {
/// Error returned when a string formatted partition value cannot be parsed to its appropriate
/// data type.
#[error("Partition value {0} cannot be parsed from string.")]
PartitionValueNotParseable(String),
/// Passthrough error returned when calling DeltaTable.
#[error("DeltaTableError: {source}")]
DeltaTable {
/// The source DeltaTableError.
#[from]
source: DeltaTableError,
},

/// Error returned when the parquet writer fails while writing the checkpoint.
#[error("Failed to write parquet: {}", .source)]
Parquet {
/// Parquet error details returned when writing the checkpoint failed.
#[from]
source: ParquetError,
},

/// Error returned when converting the schema to Arrow format failed.
#[error("Failed to convert into Arrow schema: {}", .source)]
Arrow {
/// Arrow error details returned when converting the schema in Arrow format failed
#[from]
source: ArrowError,
},
/// Passthrough error returned when calling ObjectStore.
#[error("ObjectStoreError: {source}")]
ObjectStore {
/// The source ObjectStoreError.
#[from]
source: ObjectStoreError,
},
/// Passthrough error returned by serde_json.
#[error("serde_json::Error: {source}")]
JSONSerialization {
/// The source serde_json::Error.
#[from]
source: serde_json::Error,
},
/// Passthrough error returned when doing std::io operations
#[error("std::io::Error: {source}")]
Io {
/// The source std::io::Error
#[from]
source: std::io::Error,
},
}

/// The record batch size for checkpoint parquet file
pub const CHECKPOINT_RECORD_BATCH_SIZE: usize = 5000;

impl From<CheckpointError> for ArrowError {
fn from(error: CheckpointError) -> Self {
ArrowError::from_external_error(Box::new(error))
impl From<CheckpointError> for ProtocolError {
fn from(value: CheckpointError) -> Self {
match value {
CheckpointError::PartitionValueNotParseable(_) => Self::InvalidField(value.to_string()),
CheckpointError::Arrow { source } => Self::Arrow { source },
CheckpointError::Parquet { source } => Self::ParquetParseError { source },
}
}
}

/// The record batch size for checkpoint parquet file
pub const CHECKPOINT_RECORD_BATCH_SIZE: usize = 5000;

/// Creates checkpoint at current table version
pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), CheckpointError> {
pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), ProtocolError> {
create_checkpoint_for(table.version(), table.get_state(), table.storage.as_ref()).await?;

Ok(())
}

/// Delete expires log files before given version from table. The table log retention is based on
/// the `logRetentionDuration` property of the Delta Table, 30 days by default.
pub async fn cleanup_metadata(table: &DeltaTable) -> Result<i32, DeltaTableError> {
pub async fn cleanup_metadata(table: &DeltaTable) -> Result<i32, ProtocolError> {
let log_retention_timestamp =
Utc::now().timestamp_millis() - table.get_state().log_retention_millis();
cleanup_expired_logs_for(
Expand All @@ -117,8 +90,10 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup(
table_uri: &str,
version: i64,
cleanup: Option<bool>,
) -> Result<(), CheckpointError> {
let table = open_table_with_version(table_uri, version).await?;
) -> Result<(), ProtocolError> {
let table = open_table_with_version(table_uri, version)
.await
.map_err(|err| ProtocolError::Generic(err.to_string()))?;
create_checkpoint_for(version, table.get_state(), table.storage.as_ref()).await?;

let enable_expired_log_cleanup =
Expand All @@ -136,7 +111,7 @@ async fn create_checkpoint_for(
version: i64,
state: &DeltaTableState,
storage: &DeltaObjectStore,
) -> Result<(), CheckpointError> {
) -> Result<(), ProtocolError> {
// TODO: checkpoints _can_ be multi-part... haven't actually found a good reference for
// an appropriate split point yet though so only writing a single part currently.
// See https://github.com/delta-io/delta-rs/issues/288
Expand Down Expand Up @@ -169,7 +144,7 @@ async fn flush_delete_files<T: Fn(&(i64, ObjectMeta)) -> bool>(
maybe_delete_files: &mut Vec<(i64, ObjectMeta)>,
files_to_delete: &mut Vec<(i64, ObjectMeta)>,
should_delete_file: T,
) -> Result<i32, DeltaTableError> {
) -> Result<i32, ProtocolError> {
if !maybe_delete_files.is_empty() && should_delete_file(maybe_delete_files.last().unwrap()) {
files_to_delete.append(maybe_delete_files);
}
Expand All @@ -179,7 +154,7 @@ async fn flush_delete_files<T: Fn(&(i64, ObjectMeta)) -> bool>(
.map(|file| async move {
match storage.delete(&file.1.location).await {
Ok(_) => Ok(1),
Err(e) => Err(DeltaTableError::from(e)),
Err(e) => Err(ProtocolError::from(e)),
}
})
.collect::<Vec<_>>();
Expand All @@ -202,7 +177,7 @@ pub async fn cleanup_expired_logs_for(
until_version: i64,
storage: &DeltaObjectStore,
log_retention_timestamp: i64,
) -> Result<i32, DeltaTableError> {
) -> Result<i32, ProtocolError> {
lazy_static! {
static ref DELTA_LOG_REGEX: Regex =
Regex::new(r#"_delta_log/(\d{20})\.(json|checkpoint).*$"#).unwrap();
Expand Down Expand Up @@ -306,10 +281,8 @@ pub async fn cleanup_expired_logs_for(
}
}

fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, CheckpointError> {
let current_metadata = state
.current_metadata()
.ok_or(CheckpointError::MissingMetaData)?;
fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, ProtocolError> {
let current_metadata = state.current_metadata().ok_or(ProtocolError::NoMetaData)?;

let partition_col_data_types = current_metadata.get_partition_col_data_types();

Expand Down Expand Up @@ -339,21 +312,21 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, Che
}

// protocol
let jsons = std::iter::once(action::Action::protocol(action::Protocol {
let jsons = std::iter::once(Action::protocol(Protocol {
min_reader_version: state.min_reader_version(),
min_writer_version: state.min_writer_version(),
}))
// metaData
.chain(std::iter::once(action::Action::metaData(
action::MetaData::try_from(current_metadata.clone())?,
)))
.chain(std::iter::once(Action::metaData(MetaData::try_from(
current_metadata.clone(),
)?)))
// txns
.chain(
state
.app_transaction_version()
.iter()
.map(|(app_id, version)| {
action::Action::txn(action::Txn {
Action::txn(Txn {
app_id: app_id.clone(),
version: *version,
last_updated: None,
Expand All @@ -370,9 +343,9 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, Che
r.extended_file_metadata = Some(false);
}

action::Action::remove(r)
Action::remove(r)
}))
.map(|a| serde_json::to_value(a).map_err(|err| ArrowError::JsonError(err.to_string())))
.map(|a| serde_json::to_value(a).map_err(ProtocolError::from))
// adds
.chain(state.files().iter().map(|f| {
checkpoint_add_from_state(f, partition_col_data_types.as_slice(), &stats_conversions)
Expand All @@ -392,7 +365,7 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, Che
let mut decoder = ReaderBuilder::new(arrow_schema)
.with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE)
.build_decoder()?;
let jsons: Vec<serde_json::Value> = jsons.map(|r| r.unwrap()).collect();
let jsons = jsons.collect::<Result<Vec<serde_json::Value>, _>>()?;
decoder.serialize(&jsons)?;

while let Some(batch) = decoder.flush()? {
Expand All @@ -406,11 +379,11 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, Che
}

fn checkpoint_add_from_state(
add: &action::Add,
add: &AddAction,
partition_col_data_types: &[(&str, &SchemaDataType)],
stats_conversions: &[(SchemaPath, SchemaDataType)],
) -> Result<Value, ArrowError> {
let mut v = serde_json::to_value(action::Action::add(add.clone()))
) -> Result<Value, ProtocolError> {
let mut v = serde_json::to_value(Action::add(add.clone()))
.map_err(|err| ArrowError::JsonError(err.to_string()))?;

v["add"]["dataChange"] = Value::Bool(false);
Expand Down Expand Up @@ -457,7 +430,7 @@ fn checkpoint_add_from_state(
fn typed_partition_value_from_string(
string_value: &str,
data_type: &SchemaDataType,
) -> Result<Value, CheckpointError> {
) -> Result<Value, ProtocolError> {
match data_type {
SchemaDataType::primitive(primitive_type) => match primitive_type.as_str() {
"string" | "binary" => Ok(string_value.to_owned().into()),
Expand Down Expand Up @@ -504,7 +477,7 @@ fn typed_partition_value_from_string(
fn typed_partition_value_from_option_string(
string_value: &Option<String>,
data_type: &SchemaDataType,
) -> Result<Value, CheckpointError> {
) -> Result<Value, ProtocolError> {
match string_value {
Some(s) => {
if s.is_empty() {
Expand All @@ -517,8 +490,6 @@ fn typed_partition_value_from_option_string(
}
}

type SchemaPath = Vec<String>;

fn collect_stats_conversions(
paths: &mut Vec<(SchemaPath, SchemaDataType)>,
fields: &[SchemaField],
Expand Down
Loading

0 comments on commit 98af4dc

Please sign in to comment.