diff --git a/python/src/error.rs b/python/src/error.rs index 3d86483a79..69c3d0fbaf 100644 --- a/python/src/error.rs +++ b/python/src/error.rs @@ -1,6 +1,6 @@ use arrow_schema::ArrowError; -use deltalake::checkpoints::CheckpointError; -use deltalake::{DeltaTableError, ObjectStoreError}; +use deltalake::action::ProtocolError; +use deltalake::{errors::DeltaTableError, ObjectStoreError}; use pyo3::exceptions::{ PyException, PyFileNotFoundError, PyIOError, PyNotImplementedError, PyValueError, }; @@ -59,16 +59,16 @@ fn arrow_to_py(err: ArrowError) -> PyErr { } } -fn checkpoint_to_py(err: CheckpointError) -> PyErr { +fn checkpoint_to_py(err: ProtocolError) -> PyErr { match err { - CheckpointError::Io { source } => PyIOError::new_err(source.to_string()), - CheckpointError::Arrow { source } => arrow_to_py(source), - CheckpointError::DeltaTable { source } => inner_to_py_err(source), - CheckpointError::ObjectStore { source } => object_store_to_py(source), - CheckpointError::MissingMetaData => DeltaProtocolError::new_err("Table metadata missing"), - CheckpointError::PartitionValueNotParseable(err) => PyValueError::new_err(err), - CheckpointError::JSONSerialization { source } => PyValueError::new_err(source.to_string()), - CheckpointError::Parquet { source } => PyIOError::new_err(source.to_string()), + ProtocolError::Arrow { source } => arrow_to_py(source), + ProtocolError::ObjectStore { source } => object_store_to_py(source), + ProtocolError::NoMetaData => DeltaProtocolError::new_err("Table metadata missing"), + ProtocolError::InvalidField(err) => PyValueError::new_err(err), + ProtocolError::InvalidRow(err) => PyValueError::new_err(err), + ProtocolError::SerializeOperation { source } => PyValueError::new_err(source.to_string()), + ProtocolError::ParquetParseError { source } => PyIOError::new_err(source.to_string()), + ProtocolError::Generic(msg) => DeltaError::new_err(msg), } } @@ -81,7 +81,7 @@ pub enum PythonError { #[error("Error in arrow")] Arrow(#[from] ArrowError), #[error("Error in checkpoint")] - Checkpoint(#[from] CheckpointError), + Protocol(#[from] ProtocolError), } impl From for pyo3::PyErr { @@ -90,7 +90,7 @@ impl From for pyo3::PyErr { PythonError::DeltaTable(err) => inner_to_py_err(err), PythonError::ObjectStore(err) => object_store_to_py(err), PythonError::Arrow(err) => arrow_to_py(err), - PythonError::Checkpoint(err) => checkpoint_to_py(err), + PythonError::Protocol(err) => checkpoint_to_py(err), } } } diff --git a/python/src/lib.rs b/python/src/lib.rs index b86f3b1c1e..8e2c4d282f 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -23,6 +23,7 @@ use deltalake::builder::DeltaTableBuilder; use deltalake::checkpoints::create_checkpoint; use deltalake::datafusion::prelude::SessionContext; use deltalake::delta_datafusion::DeltaDataChecker; +use deltalake::errors::DeltaTableError; use deltalake::operations::optimize::OptimizeBuilder; use deltalake::operations::transaction::commit; use deltalake::operations::vacuum::VacuumBuilder; @@ -174,7 +175,7 @@ impl RawDeltaTable { &self, partitions_filters: Vec<(&str, &str, PartitionFilterValue)>, ) -> PyResult> { - let partition_filters: Result>, deltalake::DeltaTableError> = + let partition_filters: Result>, DeltaTableError> = partitions_filters .into_iter() .map(|filter| match filter { @@ -520,7 +521,7 @@ impl RawDeltaTable { fn convert_partition_filters<'a>( partitions_filters: Vec<(&'a str, &'a str, PartitionFilterValue<'a>)>, -) -> Result>, deltalake::DeltaTableError> { +) -> Result>, DeltaTableError> { partitions_filters .into_iter() .map(|filter| match filter { diff --git a/rust/examples/basic_operations.rs b/rust/examples/basic_operations.rs index 2b50f9968a..8aa91a6cb3 100644 --- a/rust/examples/basic_operations.rs +++ b/rust/examples/basic_operations.rs @@ -37,7 +37,7 @@ fn get_table_batches() -> RecordBatch { } #[tokio::main(flavor = "current_thread")] -async fn main() -> Result<(), deltalake::DeltaTableError> { +async fn main() -> Result<(), deltalake::errors::DeltaTableError> { // Create a delta operations client pointing at an un-initialized in-memory location. // In a production environment this would be created with "try_new" and point at // a real storage location. diff --git a/rust/examples/read_delta_table.rs b/rust/examples/read_delta_table.rs index c225846fa4..9c0c60ef1a 100644 --- a/rust/examples/read_delta_table.rs +++ b/rust/examples/read_delta_table.rs @@ -1,5 +1,5 @@ #[tokio::main(flavor = "current_thread")] -async fn main() -> Result<(), deltalake::DeltaTableError> { +async fn main() -> Result<(), deltalake::errors::DeltaTableError> { let table_path = "./tests/data/delta-0.8.0"; let table = deltalake::open_table(table_path).await?; println!("{table}"); diff --git a/rust/examples/recordbatch-writer.rs b/rust/examples/recordbatch-writer.rs index 4b13b26815..ab61e93b1d 100644 --- a/rust/examples/recordbatch-writer.rs +++ b/rust/examples/recordbatch-writer.rs @@ -10,6 +10,7 @@ use chrono::prelude::*; use deltalake::arrow::array::*; use deltalake::arrow::record_batch::RecordBatch; +use deltalake::errors::DeltaTableError; use deltalake::writer::{DeltaWriter, RecordBatchWriter}; use deltalake::*; use log::*; diff --git a/rust/src/checkpoints.rs b/rust/src/action/checkpoints.rs similarity index 91% rename from rust/src/checkpoints.rs rename to rust/src/action/checkpoints.rs index 2ea64f3b65..499e49e6c7 100644 --- a/rust/src/checkpoints.rs +++ b/rust/src/action/checkpoints.rs @@ -7,7 +7,7 @@ use chrono::{DateTime, Datelike, Duration, Utc}; use futures::StreamExt; use lazy_static::lazy_static; use log::*; -use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore}; +use object_store::{path::Path, ObjectMeta, ObjectStore}; use parquet::arrow::ArrowWriter; use parquet::errors::ParquetError; use regex::Regex; @@ -17,33 +17,23 @@ use std::convert::TryFrom; use std::iter::Iterator; use std::ops::Add; -use super::action; -use super::delta_arrow::delta_log_schema_for_table; -use super::open_table_with_version; -use super::schema::*; -use super::storage::DeltaObjectStore; -use super::table_state::DeltaTableState; -use super::time_utils; -use super::DeltaTable; -use super::{CheckPoint, DeltaTableError}; +use super::{Action, Add as AddAction, MetaData, Protocol, ProtocolError, Txn}; +use crate::delta_arrow::delta_log_schema_for_table; +use crate::schema::*; +use crate::storage::DeltaObjectStore; +use crate::table_state::DeltaTableState; +use crate::{open_table_with_version, time_utils, CheckPoint, DeltaTable}; + +type SchemaPath = Vec; /// Error returned when there is an error during creating a checkpoint. #[derive(thiserror::Error, Debug)] -pub enum CheckpointError { - /// Error returned when the DeltaTableState does not contain a metadata action. - #[error("DeltaTableMetadata not present in DeltaTableState")] - MissingMetaData, +enum CheckpointError { /// Error returned when a string formatted partition value cannot be parsed to its appropriate /// data type. #[error("Partition value {0} cannot be parsed from string.")] PartitionValueNotParseable(String), - /// Passthrough error returned when calling DeltaTable. - #[error("DeltaTableError: {source}")] - DeltaTable { - /// The source DeltaTableError. - #[from] - source: DeltaTableError, - }, + /// Error returned when the parquet writer fails while writing the checkpoint. #[error("Failed to write parquet: {}", .source)] Parquet { @@ -51,6 +41,7 @@ pub enum CheckpointError { #[from] source: ParquetError, }, + /// Error returned when converting the schema to Arrow format failed. #[error("Failed to convert into Arrow schema: {}", .source)] Arrow { @@ -58,48 +49,30 @@ pub enum CheckpointError { #[from] source: ArrowError, }, - /// Passthrough error returned when calling ObjectStore. - #[error("ObjectStoreError: {source}")] - ObjectStore { - /// The source ObjectStoreError. - #[from] - source: ObjectStoreError, - }, - /// Passthrough error returned by serde_json. - #[error("serde_json::Error: {source}")] - JSONSerialization { - /// The source serde_json::Error. - #[from] - source: serde_json::Error, - }, - /// Passthrough error returned when doing std::io operations - #[error("std::io::Error: {source}")] - Io { - /// The source std::io::Error - #[from] - source: std::io::Error, - }, } -/// The record batch size for checkpoint parquet file -pub const CHECKPOINT_RECORD_BATCH_SIZE: usize = 5000; - -impl From for ArrowError { - fn from(error: CheckpointError) -> Self { - ArrowError::from_external_error(Box::new(error)) +impl From for ProtocolError { + fn from(value: CheckpointError) -> Self { + match value { + CheckpointError::PartitionValueNotParseable(_) => Self::InvalidField(value.to_string()), + CheckpointError::Arrow { source } => Self::Arrow { source }, + CheckpointError::Parquet { source } => Self::ParquetParseError { source }, + } } } +/// The record batch size for checkpoint parquet file +pub const CHECKPOINT_RECORD_BATCH_SIZE: usize = 5000; + /// Creates checkpoint at current table version -pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), CheckpointError> { +pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), ProtocolError> { create_checkpoint_for(table.version(), table.get_state(), table.storage.as_ref()).await?; - Ok(()) } /// Delete expires log files before given version from table. The table log retention is based on /// the `logRetentionDuration` property of the Delta Table, 30 days by default. -pub async fn cleanup_metadata(table: &DeltaTable) -> Result { +pub async fn cleanup_metadata(table: &DeltaTable) -> Result { let log_retention_timestamp = Utc::now().timestamp_millis() - table.get_state().log_retention_millis(); cleanup_expired_logs_for( @@ -117,8 +90,10 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup( table_uri: &str, version: i64, cleanup: Option, -) -> Result<(), CheckpointError> { - let table = open_table_with_version(table_uri, version).await?; +) -> Result<(), ProtocolError> { + let table = open_table_with_version(table_uri, version) + .await + .map_err(|err| ProtocolError::Generic(err.to_string()))?; create_checkpoint_for(version, table.get_state(), table.storage.as_ref()).await?; let enable_expired_log_cleanup = @@ -136,7 +111,7 @@ async fn create_checkpoint_for( version: i64, state: &DeltaTableState, storage: &DeltaObjectStore, -) -> Result<(), CheckpointError> { +) -> Result<(), ProtocolError> { // TODO: checkpoints _can_ be multi-part... haven't actually found a good reference for // an appropriate split point yet though so only writing a single part currently. // See https://github.com/delta-io/delta-rs/issues/288 @@ -169,7 +144,7 @@ async fn flush_delete_files bool>( maybe_delete_files: &mut Vec<(i64, ObjectMeta)>, files_to_delete: &mut Vec<(i64, ObjectMeta)>, should_delete_file: T, -) -> Result { +) -> Result { if !maybe_delete_files.is_empty() && should_delete_file(maybe_delete_files.last().unwrap()) { files_to_delete.append(maybe_delete_files); } @@ -179,7 +154,7 @@ async fn flush_delete_files bool>( .map(|file| async move { match storage.delete(&file.1.location).await { Ok(_) => Ok(1), - Err(e) => Err(DeltaTableError::from(e)), + Err(e) => Err(ProtocolError::from(e)), } }) .collect::>(); @@ -202,7 +177,7 @@ pub async fn cleanup_expired_logs_for( until_version: i64, storage: &DeltaObjectStore, log_retention_timestamp: i64, -) -> Result { +) -> Result { lazy_static! { static ref DELTA_LOG_REGEX: Regex = Regex::new(r#"_delta_log/(\d{20})\.(json|checkpoint).*$"#).unwrap(); @@ -306,10 +281,8 @@ pub async fn cleanup_expired_logs_for( } } -fn parquet_bytes_from_state(state: &DeltaTableState) -> Result { - let current_metadata = state - .current_metadata() - .ok_or(CheckpointError::MissingMetaData)?; +fn parquet_bytes_from_state(state: &DeltaTableState) -> Result { + let current_metadata = state.current_metadata().ok_or(ProtocolError::NoMetaData)?; let partition_col_data_types = current_metadata.get_partition_col_data_types(); @@ -339,21 +312,21 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result Result Result = jsons.map(|r| r.unwrap()).collect(); + let jsons = jsons.collect::, _>>()?; decoder.serialize(&jsons)?; while let Some(batch) = decoder.flush()? { @@ -406,11 +379,11 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result Result { - let mut v = serde_json::to_value(action::Action::add(add.clone())) +) -> Result { + let mut v = serde_json::to_value(Action::add(add.clone())) .map_err(|err| ArrowError::JsonError(err.to_string()))?; v["add"]["dataChange"] = Value::Bool(false); @@ -457,7 +430,7 @@ fn checkpoint_add_from_state( fn typed_partition_value_from_string( string_value: &str, data_type: &SchemaDataType, -) -> Result { +) -> Result { match data_type { SchemaDataType::primitive(primitive_type) => match primitive_type.as_str() { "string" | "binary" => Ok(string_value.to_owned().into()), @@ -504,7 +477,7 @@ fn typed_partition_value_from_string( fn typed_partition_value_from_option_string( string_value: &Option, data_type: &SchemaDataType, -) -> Result { +) -> Result { match string_value { Some(s) => { if s.is_empty() { @@ -517,8 +490,6 @@ fn typed_partition_value_from_option_string( } } -type SchemaPath = Vec; - fn collect_stats_conversions( paths: &mut Vec<(SchemaPath, SchemaDataType)>, fields: &[SchemaField], diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index e246bfee97..39ae3db473 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -2,14 +2,16 @@ #![allow(non_camel_case_types)] -#[cfg(feature = "parquet")] -mod parquet_read; - +#[cfg(all(feature = "arrow", feature = "parquet"))] +pub mod checkpoints; #[cfg(feature = "parquet2")] pub mod parquet2_read; +#[cfg(feature = "parquet")] +mod parquet_read; -use crate::delta_config::IsolationLevel; -use crate::{schema::*, DeltaResult, DeltaTableError, DeltaTableMetaData}; +#[cfg(all(feature = "arrow"))] +use arrow_schema::ArrowError; +use object_store::Error as ObjectStoreError; use percent_encoding::percent_decode; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; @@ -17,26 +19,38 @@ use std::borrow::Borrow; use std::collections::HashMap; use std::hash::{Hash, Hasher}; +use crate::delta_config::IsolationLevel; +use crate::errors::DeltaResult; +use crate::{schema::*, DeltaTableMetaData}; + /// Error returned when an invalid Delta log action is encountered. +#[allow(missing_docs)] #[derive(thiserror::Error, Debug)] -pub enum ActionError { +pub enum ProtocolError { + #[error("Table state does not contain metadata")] + NoMetaData, + /// The action contains an invalid field. #[error("Invalid action field: {0}")] InvalidField(String), + /// A parquet log checkpoint file contains an invalid action. #[error("Invalid action in parquet row: {0}")] InvalidRow(String), + /// A generic action error. The wrapped error string describes the details. #[error("Generic action error: {0}")] Generic(String), + #[cfg(feature = "parquet2")] #[error("Failed to parse parquet checkpoint: {}", .source)] /// Error returned when parsing checkpoint parquet using the parquet2 crate. ParquetParseError { /// Parquet error details returned when parsing the checkpoint parquet #[from] - source: parquet2_read::ParseError, + source: parquet2::error::Error, }, + #[cfg(feature = "parquet")] #[error("Failed to parse parquet checkpoint: {}", .source)] /// Error returned when parsing checkpoint parquet using the parquet crate. @@ -45,6 +59,7 @@ pub enum ActionError { #[from] source: parquet::errors::ParquetError, }, + /// Faild to serialize operation #[error("Failed to serialize operation: {source}")] SerializeOperation { @@ -52,13 +67,30 @@ pub enum ActionError { /// The source error source: serde_json::Error, }, + + /// Error returned when converting the schema to Arrow format failed. + #[cfg(all(feature = "arrow"))] + #[error("Failed to convert into Arrow schema: {}", .source)] + Arrow { + /// Arrow error details returned when converting the schema in Arrow format failed + #[from] + source: ArrowError, + }, + + /// Passthrough error returned when calling ObjectStore. + #[error("ObjectStoreError: {source}")] + ObjectStore { + /// The source ObjectStoreError. + #[from] + source: ObjectStoreError, + }, } -fn decode_path(raw_path: &str) -> Result { +fn decode_path(raw_path: &str) -> Result { percent_decode(raw_path.as_bytes()) .decode_utf8() .map(|c| c.to_string()) - .map_err(|e| ActionError::InvalidField(format!("Decode path failed for action: {e}"))) + .map_err(|e| ProtocolError::InvalidField(format!("Decode path failed for action: {e}"))) } /// Struct used to represent minValues and maxValues in add action statistics. @@ -242,7 +274,7 @@ impl Hash for Add { impl Add { /// Returns the Add action with path decoded. - pub fn path_decoded(self) -> Result { + pub fn path_decoded(self) -> Result { decode_path(&self.path).map(|path| Self { path, ..self }) } @@ -341,11 +373,11 @@ impl MetaData { } impl TryFrom for MetaData { - type Error = DeltaTableError; + type Error = ProtocolError; fn try_from(metadata: DeltaTableMetaData) -> Result { let schema_string = serde_json::to_string(&metadata.schema) - .map_err(|e| DeltaTableError::SerializeSchemaJson { json_err: e })?; + .map_err(|source| ProtocolError::SerializeOperation { source })?; Ok(Self { id: metadata.id, name: metadata.name, @@ -412,7 +444,7 @@ impl PartialEq for Remove { impl Remove { /// Returns the Remove action with path decoded. - pub fn path_decoded(self) -> Result { + pub fn path_decoded(self) -> Result { decode_path(&self.path).map(|path| Self { path, ..self }) } } @@ -597,7 +629,7 @@ impl DeltaOperation { /// Parameters configured for operation. pub fn operation_parameters(&self) -> DeltaResult> { if let Some(Some(Some(map))) = serde_json::to_value(self) - .map_err(|err| ActionError::SerializeOperation { source: err })? + .map_err(|err| ProtocolError::SerializeOperation { source: err })? .as_object() .map(|p| p.values().next().map(|q| q.as_object())) { @@ -616,7 +648,7 @@ impl DeltaOperation { }) .collect()) } else { - Err(ActionError::Generic( + Err(ProtocolError::Generic( "Operation parameters serialized into unexpected shape".into(), ) .into()) diff --git a/rust/src/action/parquet2_read/mod.rs b/rust/src/action/parquet2_read/mod.rs index 7c7519344a..afa6065279 100644 --- a/rust/src/action/parquet2_read/mod.rs +++ b/rust/src/action/parquet2_read/mod.rs @@ -10,6 +10,14 @@ use parquet2::read::decompress; use parquet2::read::get_page_iterator; use parquet2::read::levels::get_bit_width; +use super::ProtocolError; +use crate::action::{Action, Add, CommitInfo, MetaData, Protocol, Remove, Txn}; +use crate::schema::Guid; +use boolean::for_each_boolean_field_value; +use map::for_each_map_field_value; +use primitive::for_each_primitive_field_value; +use string::{for_each_repeated_string_field_value, for_each_string_field_value}; + mod boolean; mod dictionary; mod map; @@ -18,13 +26,6 @@ mod stats; mod string; mod validity; -use crate::action::{Action, Add, CommitInfo, MetaData, Protocol, Remove, Txn}; -use crate::schema::Guid; -use boolean::for_each_boolean_field_value; -use map::for_each_map_field_value; -use primitive::for_each_primitive_field_value; -use string::{for_each_repeated_string_field_value, for_each_string_field_value}; - /// Parquet deserilization error #[derive(thiserror::Error, Debug)] pub enum ParseError { @@ -43,6 +44,16 @@ pub enum ParseError { }, } +impl From for ProtocolError { + fn from(value: ParseError) -> Self { + match value { + ParseError::Generic(msg) => Self::Generic(msg), + ParseError::InvalidAction(msg) => Self::InvalidRow(msg), + ParseError::Parquet { source } => Self::ParquetParseError { source }, + } + } +} + #[derive(Default)] struct DeserState { add_partition_values: map::MapState, @@ -642,7 +653,7 @@ const MAX_PARQUET_HEADER_SIZE: usize = usize::MAX; pub fn actions_from_row_group( row_group: parquet2::metadata::RowGroupMetaData, reader: &mut R, -) -> Result, ParseError> { +) -> Result, ProtocolError> { let row_count = row_group.num_rows(); // TODO: reuse actions buffer let mut actions: Vec> = vec![None; row_count as usize]; @@ -664,7 +675,8 @@ pub fn actions_from_row_group( return Err(ParseError::InvalidAction(format!( "unexpected action: {}", &schema_path[0] - ))); + )) + .into()); } }; let field = &schema_path[1..]; diff --git a/rust/src/action/parquet2_read/stats.rs b/rust/src/action/parquet2_read/stats.rs index b67da87c05..c9bb2f9bdc 100644 --- a/rust/src/action/parquet2_read/stats.rs +++ b/rust/src/action/parquet2_read/stats.rs @@ -1,9 +1,9 @@ -use crate::action::{ActionError, Add, Stats}; +use crate::action::{Add, ProtocolError, Stats}; impl Add { /// Returns the composite HashMap representation of stats contained in the action if present. /// Since stats are defined as optional in the protocol, this may be None. - pub fn get_stats_parsed(&self) -> Result, ActionError> { + pub fn get_stats_parsed(&self) -> Result, ProtocolError> { Ok(None) } } diff --git a/rust/src/action/parquet_read/mod.rs b/rust/src/action/parquet_read/mod.rs index ccdfba2423..15737a1761 100644 --- a/rust/src/action/parquet_read/mod.rs +++ b/rust/src/action/parquet_read/mod.rs @@ -1,12 +1,13 @@ +use std::collections::HashMap; + use chrono::{SecondsFormat, TimeZone, Utc}; use num_bigint::BigInt; use num_traits::cast::ToPrimitive; use parquet::record::{Field, ListAccessor, MapAccessor, RowAccessor}; use serde_json::json; -use std::collections::HashMap; use crate::action::{ - Action, ActionError, Add, AddCDCFile, ColumnCountStat, ColumnValueStat, MetaData, Protocol, + Action, Add, AddCDCFile, ColumnCountStat, ColumnValueStat, MetaData, Protocol, ProtocolError, Remove, Stats, Txn, }; @@ -28,14 +29,14 @@ fn populate_hashmap_with_option_from_parquet_map( Ok(()) } -fn gen_action_type_error(action: &str, field: &str, expected_type: &str) -> ActionError { - ActionError::InvalidField(format!( +fn gen_action_type_error(action: &str, field: &str, expected_type: &str) -> ProtocolError { + ProtocolError::InvalidField(format!( "type for {field} in {action} action should be {expected_type}" )) } impl AddCDCFile { - fn from_parquet_record(_record: &parquet::record::Row) -> Result { + fn from_parquet_record(_record: &parquet::record::Row) -> Result { let re = Self { ..Default::default() }; @@ -44,7 +45,7 @@ impl AddCDCFile { } impl Add { - fn from_parquet_record(record: &parquet::record::Row) -> Result { + fn from_parquet_record(record: &parquet::record::Row) -> Result { let mut re = Self { ..Default::default() }; @@ -81,7 +82,7 @@ impl Add { parquet_map, ) .map_err(|estr| { - ActionError::InvalidField(format!( + ProtocolError::InvalidField(format!( "Invalid partitionValues for add action: {estr}", )) })?; @@ -101,7 +102,7 @@ impl Add { let mut tags = HashMap::new(); populate_hashmap_with_option_from_parquet_map(&mut tags, tags_map) .map_err(|estr| { - ActionError::InvalidField(format!( + ProtocolError::InvalidField(format!( "Invalid tags for add action: {estr}", )) })?; @@ -142,7 +143,7 @@ impl Add { /// Returns the composite HashMap representation of stats contained in the action if present. /// Since stats are defined as optional in the protocol, this may be None. - pub fn get_stats_parsed(&self) -> Result, ActionError> { + pub fn get_stats_parsed(&self) -> Result, ProtocolError> { self.stats_parsed.as_ref().map_or(Ok(None), |record| { let mut stats = Stats::default(); @@ -304,7 +305,7 @@ fn convert_date_to_string(value: i32) -> Result { } impl MetaData { - fn from_parquet_record(record: &parquet::record::Row) -> Result { + fn from_parquet_record(record: &parquet::record::Row) -> Result { let mut re = Self { ..Default::default() }; @@ -365,7 +366,7 @@ impl MetaData { configuration_map, ) .map_err(|estr| { - ActionError::InvalidField(format!( + ProtocolError::InvalidField(format!( "Invalid configuration for metaData action: {estr}", )) })?; @@ -389,7 +390,7 @@ impl MetaData { options_map, ) .map_err(|estr| { - ActionError::InvalidField(format!( + ProtocolError::InvalidField(format!( "Invalid format.options for metaData action: {estr}", )) })?; @@ -415,7 +416,7 @@ impl MetaData { } impl Remove { - fn from_parquet_record(record: &parquet::record::Row) -> Result { + fn from_parquet_record(record: &parquet::record::Row) -> Result { let mut re = Self { data_change: true, extended_file_metadata: Some(false), @@ -454,7 +455,7 @@ impl Remove { parquet_map, ) .map_err(|estr| { - ActionError::InvalidField(format!( + ProtocolError::InvalidField(format!( "Invalid partitionValues for remove action: {estr}", )) })?; @@ -467,7 +468,7 @@ impl Remove { let mut tags = HashMap::new(); populate_hashmap_with_option_from_parquet_map(&mut tags, tags_map) .map_err(|estr| { - ActionError::InvalidField(format!( + ProtocolError::InvalidField(format!( "Invalid tags for remove action: {estr}", )) })?; @@ -496,7 +497,7 @@ impl Remove { } impl Txn { - fn from_parquet_record(record: &parquet::record::Row) -> Result { + fn from_parquet_record(record: &parquet::record::Row) -> Result { let mut re = Self { ..Default::default() }; @@ -532,7 +533,7 @@ impl Txn { } impl Protocol { - fn from_parquet_record(record: &parquet::record::Row) -> Result { + fn from_parquet_record(record: &parquet::record::Row) -> Result { let mut re = Self { ..Default::default() }; @@ -569,7 +570,7 @@ impl Action { pub fn from_parquet_record( schema: &parquet::schema::types::Type, record: &parquet::record::Row, - ) -> Result { + ) -> Result { // find column that's not none let (col_idx, col_data) = { let mut col_idx = None; @@ -589,7 +590,7 @@ impl Action { match (col_idx, col_data) { (Some(idx), Some(group)) => (idx, group), _ => { - return Err(ActionError::InvalidRow( + return Err(ProtocolError::InvalidRow( "Parquet action row only contains null columns".to_string(), )); } @@ -607,7 +608,7 @@ impl Action { "protocol" => Action::protocol(Protocol::from_parquet_record(col_data)?), "cdc" => Action::cdc(AddCDCFile::from_parquet_record(col_data)?), name => { - return Err(ActionError::InvalidField(format!( + return Err(ProtocolError::InvalidField(format!( "Unexpected action from checkpoint: {name}", ))); } diff --git a/rust/src/builder.rs b/rust/src/builder.rs index 641aca6aaf..79dd1faa57 100644 --- a/rust/src/builder.rs +++ b/rust/src/builder.rs @@ -4,15 +4,16 @@ use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; -use crate::delta::{DeltaResult, DeltaTable, DeltaTableError}; -use crate::storage::config::StorageOptions; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; - use chrono::{DateTime, FixedOffset, Utc}; use object_store::DynObjectStore; use serde::{Deserialize, Serialize}; use url::Url; +use crate::delta::DeltaTable; +use crate::errors::{DeltaResult, DeltaTableError}; +use crate::storage::config::StorageOptions; +use crate::storage::{DeltaObjectStore, ObjectStoreRef}; + #[allow(dead_code)] #[derive(Debug, thiserror::Error)] enum BuilderError { diff --git a/rust/src/data_catalog/storage/mod.rs b/rust/src/data_catalog/storage/mod.rs index bcdd85603e..37083411a5 100644 --- a/rust/src/data_catalog/storage/mod.rs +++ b/rust/src/data_catalog/storage/mod.rs @@ -12,8 +12,9 @@ use datafusion_common::DataFusionError; use futures::TryStreamExt; use object_store::ObjectStore; +use crate::errors::DeltaResult; use crate::storage::config::{configure_store, StorageOptions}; -use crate::{ensure_table_uri, open_table_with_storage_options, DeltaResult}; +use crate::{ensure_table_uri, open_table_with_storage_options}; const DELTA_LOG_FOLDER: &str = "_delta_log"; diff --git a/rust/src/delta.rs b/rust/src/delta.rs index cdbf1cc52f..3c5320c9f5 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -10,17 +10,6 @@ use std::io::{BufRead, BufReader, Cursor}; use std::sync::Arc; use std::{cmp::max, cmp::Ordering, collections::HashSet}; -use super::action; -use super::action::{Action, DeltaOperation}; -use super::partitions::PartitionFilter; -use super::schema::*; -use super::table_state::DeltaTableState; -use crate::action::{Add, Stats}; -use crate::delta_config::DeltaConfigError; -use crate::operations::transaction::TransactionError; -use crate::operations::vacuum::VacuumBuilder; -use crate::storage::{commit_uri_from_version, ObjectStoreRef}; - use chrono::{DateTime, Duration, Utc}; use futures::StreamExt; use lazy_static::lazy_static; @@ -33,6 +22,16 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_json::{Map, Value}; use uuid::Uuid; +use super::action; +use super::action::{Action, DeltaOperation}; +use super::partitions::PartitionFilter; +use super::schema::*; +use super::table_state::DeltaTableState; +use crate::action::{Add, Stats}; +use crate::errors::{ApplyLogError, DeltaTableError, LoadCheckpointError}; +use crate::operations::vacuum::VacuumBuilder; +use crate::storage::{commit_uri_from_version, ObjectStoreRef}; + // TODO re-exports only for transition pub use crate::builder::{DeltaTableBuilder, DeltaTableConfig, DeltaVersion}; @@ -64,213 +63,6 @@ impl PartialEq for CheckPoint { impl Eq for CheckPoint {} -/// A result returned by delta-rs -pub type DeltaResult = Result; - -/// Delta Table specific error -#[derive(thiserror::Error, Debug)] -pub enum DeltaTableError { - /// Error returned when applying transaction log failed. - #[error("Failed to apply transaction log: {}", .source)] - ApplyLog { - /// Apply error details returned when applying transaction log failed. - #[from] - source: ApplyLogError, - }, - /// Error returned when loading checkpoint failed. - #[error("Failed to load checkpoint: {}", .source)] - LoadCheckpoint { - /// Load checkpoint error details returned when loading checkpoint failed. - #[from] - source: LoadCheckpointError, - }, - /// Error returned when reading the delta log object failed. - #[error("Failed to read delta log object: {}", .source)] - ObjectStore { - /// Storage error details when reading the delta log object failed. - #[from] - source: ObjectStoreError, - }, - - /// Error returned when parsing checkpoint parquet. - #[cfg(any(feature = "parquet", feature = "parquet2"))] - #[error("Failed to parse parquet: {}", .source)] - Parquet { - /// Parquet error details returned when reading the checkpoint failed. - #[cfg(feature = "parquet")] - #[from] - source: parquet::errors::ParquetError, - /// Parquet error details returned when reading the checkpoint failed. - #[cfg(feature = "parquet2")] - #[from] - source: parquet2::error::Error, - }, - - /// Error returned when converting the schema in Arrow format failed. - #[cfg(feature = "arrow")] - #[error("Failed to convert into Arrow schema: {}", .source)] - Arrow { - /// Arrow error details returned when converting the schema in Arrow format failed - #[from] - source: arrow::error::ArrowError, - }, - - /// Error returned when the log record has an invalid JSON. - #[error("Invalid JSON in log record, version={}, line=`{}`, err=`{}`", .version, .line, .json_err)] - InvalidJsonLog { - /// JSON error details returned when parsing the record JSON. - json_err: serde_json::error::Error, - /// invalid log entry content. - line: String, - /// corresponding table version for the log file. - version: i64, - }, - /// Error returned when the log contains invalid stats JSON. - #[error("Invalid JSON in file stats: {}", .json_err)] - InvalidStatsJson { - /// JSON error details returned when parsing the stats JSON. - json_err: serde_json::error::Error, - }, - /// Error returned when the log contains invalid stats JSON. - #[error("Invalid JSON in invariant expression, line=`{line}`, err=`{json_err}`")] - InvalidInvariantJson { - /// JSON error details returned when parsing the invariant expression JSON. - json_err: serde_json::error::Error, - /// Invariant expression. - line: String, - }, - /// Error returned when the DeltaTable has an invalid version. - #[error("Invalid table version: {0}")] - InvalidVersion(i64), - /// Error returned when the DeltaTable has no data files. - #[error("Corrupted table, cannot read data file {}: {}", .path, .source)] - MissingDataFile { - /// Source error details returned when the DeltaTable has no data files. - source: std::io::Error, - /// The Path used of the DeltaTable - path: String, - }, - /// Error returned when the datetime string is invalid for a conversion. - #[error("Invalid datetime string: {}", .source)] - InvalidDateTimeString { - /// Parse error details returned of the datetime string parse error. - #[from] - source: chrono::ParseError, - }, - /// Error returned when the action record is invalid in log. - #[error("Invalid action record found in log: {}", .source)] - InvalidAction { - /// Action error details returned of the invalid action. - #[from] - source: action::ActionError, - }, - /// Error returned when attempting to write bad data to the table - #[error("Attempted to write invalid data to the table: {:#?}", violations)] - InvalidData { - /// Action error details returned of the invalid action. - violations: Vec, - }, - /// Error returned when it is not a DeltaTable. - #[error("Not a Delta table: {0}")] - NotATable(String), - - /// Error returned when no metadata was found in the DeltaTable. - #[error("No metadata found, please make sure table is loaded.")] - NoMetadata, - /// Error returned when no schema was found in the DeltaTable. - #[error("No schema found, please make sure table is loaded.")] - NoSchema, - /// Error returned when no partition was found in the DeltaTable. - #[error("No partitions found, please make sure table is partitioned.")] - LoadPartitions, - - /// Error returned when writes are attempted with data that doesn't match the schema of the - /// table - #[error("Data does not match the schema or partitions of the table: {}", msg)] - SchemaMismatch { - /// Information about the mismatch - msg: String, - }, - - /// Error returned when a partition is not formatted as a Hive Partition. - #[error("This partition is not formatted with key=value: {}", .partition)] - PartitionError { - /// The malformed partition used. - partition: String, - }, - /// Error returned when a invalid partition filter was found. - #[error("Invalid partition filter found: {}.", .partition_filter)] - InvalidPartitionFilter { - /// The invalid partition filter used. - partition_filter: String, - }, - /// Error returned when a partition filter uses a nonpartitioned column. - #[error("Tried to filter partitions on non-partitioned columns: {:#?}", .nonpartitioned_columns)] - ColumnsNotPartitioned { - /// The columns used in the partition filter that is not partitioned - nonpartitioned_columns: Vec, - }, - /// Error returned when a line from log record is invalid. - #[error("Failed to read line from log record")] - Io { - /// Source error details returned while reading the log record. - #[from] - source: std::io::Error, - }, - /// Error raised while commititng transaction - #[error("Transaction failed: {source}")] - Transaction { - /// The source error - source: TransactionError, - }, - /// Error returned when transaction is failed to be committed because given version already exists. - #[error("Delta transaction failed, version {0} already exists.")] - VersionAlreadyExists(i64), - /// Error returned when user attempts to commit actions that don't belong to the next version. - #[error("Delta transaction failed, version {0} does not follow {1}")] - VersionMismatch(i64, i64), - /// A Feature is missing to perform operation - #[error("Delta-rs must be build with feature '{feature}' to support loading from: {url}.")] - MissingFeature { - /// Name of the missing feature - feature: &'static str, - /// Storage location url - url: String, - }, - /// A Feature is missing to perform operation - #[error("Cannot infer storage location from: {0}")] - InvalidTableLocation(String), - /// Generic Delta Table error - #[error("Log JSON serialization error: {json_err}")] - SerializeLogJson { - /// JSON serialization error - json_err: serde_json::error::Error, - }, - /// Generic Delta Table error - #[error("Schema JSON serialization error: {json_err}")] - SerializeSchemaJson { - /// JSON serialization error - json_err: serde_json::error::Error, - }, - /// Generic Delta Table error - #[error("Generic DeltaTable error: {0}")] - Generic(String), - /// Generic Delta Table error - #[error("Generic error: {source}")] - GenericError { - /// Source error - source: Box, - }, -} - -impl From for DeltaTableError { - fn from(err: object_store::path::Error) -> Self { - Self::GenericError { - source: Box::new(err), - } - } -} - /// Delta table metadata #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct DeltaTableMetaData { @@ -371,87 +163,6 @@ impl TryFrom for DeltaTableMetaData { } } -/// Error related to Delta log application -#[derive(thiserror::Error, Debug)] -pub enum ApplyLogError { - /// Error returned when the end of transaction log is reached. - #[error("End of transaction log")] - EndOfLog, - /// Error returned when the JSON of the log record is invalid. - #[error("Invalid JSON found when applying log record")] - InvalidJson { - /// JSON error details returned when reading the JSON log record. - #[from] - source: serde_json::error::Error, - }, - /// Error returned when the storage failed to read the log content. - #[error("Failed to read log content")] - Storage { - /// Storage error details returned while reading the log content. - source: ObjectStoreError, - }, - /// Error returned when reading delta config failed. - #[error("Failed to read delta config: {}", .source)] - Config { - /// Delta config error returned when reading delta config failed. - #[from] - source: DeltaConfigError, - }, - /// Error returned when a line from log record is invalid. - #[error("Failed to read line from log record")] - Io { - /// Source error details returned while reading the log record. - #[from] - source: std::io::Error, - }, - /// Error returned when the action record is invalid in log. - #[error("Invalid action record found in log: {}", .source)] - InvalidAction { - /// Action error details returned of the invalid action. - #[from] - source: action::ActionError, - }, -} - -impl From for ApplyLogError { - fn from(error: ObjectStoreError) -> Self { - match error { - ObjectStoreError::NotFound { .. } => ApplyLogError::EndOfLog, - _ => ApplyLogError::Storage { source: error }, - } - } -} - -/// Error related to checkpoint loading -#[derive(thiserror::Error, Debug)] -pub enum LoadCheckpointError { - /// Error returned when the JSON checkpoint is not found. - #[error("Checkpoint file not found")] - NotFound, - /// Error returned when the JSON checkpoint is invalid. - #[error("Invalid JSON in checkpoint: {source}")] - InvalidJson { - /// Error details returned while reading the JSON. - #[from] - source: serde_json::error::Error, - }, - /// Error returned when it failed to read the checkpoint content. - #[error("Failed to read checkpoint content: {source}")] - Storage { - /// Storage error details returned while reading the checkpoint content. - source: ObjectStoreError, - }, -} - -impl From for LoadCheckpointError { - fn from(error: ObjectStoreError) -> Self { - match error { - ObjectStoreError::NotFound { .. } => LoadCheckpointError::NotFound, - _ => LoadCheckpointError::Storage { source: error }, - } - } -} - /// The next commit that's available from underlying storage /// TODO: Maybe remove this and replace it with Some/None and create a `Commit` struct to contain the next commit /// diff --git a/rust/src/delta_config.rs b/rust/src/delta_config.rs index 722f04d5ae..1b7f6b7e0f 100644 --- a/rust/src/delta_config.rs +++ b/rust/src/delta_config.rs @@ -5,7 +5,7 @@ use std::{collections::HashMap, str::FromStr}; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; -use crate::DeltaTableError; +use crate::errors::DeltaTableError; /// Typed property keys that can be defined on a delta table /// diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 6ce3f1e4ac..7c4fbf8e4c 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -57,13 +57,12 @@ use datafusion_proto::physical_plan::PhysicalExtensionCodec; use object_store::ObjectMeta; use url::Url; -use crate::action::Add; +use crate::action::{self, Add}; use crate::builder::ensure_table_uri; +use crate::errors::{DeltaResult, DeltaTableError}; use crate::storage::ObjectStoreRef; use crate::table_state::DeltaTableState; -use crate::{action, open_table, open_table_with_storage_options, SchemaDataType}; -use crate::{DeltaResult, Invariant}; -use crate::{DeltaTable, DeltaTableError}; +use crate::{open_table, open_table_with_storage_options, DeltaTable, Invariant, SchemaDataType}; impl From for DataFusionError { fn from(err: DeltaTableError) -> Self { diff --git a/rust/src/errors.rs b/rust/src/errors.rs new file mode 100644 index 0000000000..40977db937 --- /dev/null +++ b/rust/src/errors.rs @@ -0,0 +1,323 @@ +//! Exceptions for the deltalake crate +use object_store::Error as ObjectStoreError; + +use crate::action::ProtocolError; +use crate::delta_config::DeltaConfigError; +use crate::operations::transaction::TransactionError; + +/// A result returned by delta-rs +pub type DeltaResult = Result; + +/// Delta Table specific error +#[derive(thiserror::Error, Debug)] +pub enum DeltaTableError { + /// Error returned when applying transaction log failed. + #[error("Failed to apply transaction log: {}", .source)] + ApplyLog { + /// Apply error details returned when applying transaction log failed. + #[from] + source: ApplyLogError, + }, + + /// Error returned when loading checkpoint failed. + #[error("Failed to load checkpoint: {}", .source)] + LoadCheckpoint { + /// Load checkpoint error details returned when loading checkpoint failed. + #[from] + source: LoadCheckpointError, + }, + + /// Error returned when reading the delta log object failed. + #[error("Failed to read delta log object: {}", .source)] + ObjectStore { + /// Storage error details when reading the delta log object failed. + #[from] + source: ObjectStoreError, + }, + + /// Error returned when parsing checkpoint parquet. + #[cfg(any(feature = "parquet", feature = "parquet2"))] + #[error("Failed to parse parquet: {}", .source)] + Parquet { + /// Parquet error details returned when reading the checkpoint failed. + #[cfg(feature = "parquet")] + #[from] + source: parquet::errors::ParquetError, + /// Parquet error details returned when reading the checkpoint failed. + #[cfg(feature = "parquet2")] + #[from] + source: parquet2::error::Error, + }, + + /// Error returned when converting the schema in Arrow format failed. + #[cfg(feature = "arrow")] + #[error("Failed to convert into Arrow schema: {}", .source)] + Arrow { + /// Arrow error details returned when converting the schema in Arrow format failed + #[from] + source: arrow::error::ArrowError, + }, + + /// Error returned when the log record has an invalid JSON. + #[error("Invalid JSON in log record, version={}, line=`{}`, err=`{}`", .version, .line, .json_err)] + InvalidJsonLog { + /// JSON error details returned when parsing the record JSON. + json_err: serde_json::error::Error, + /// invalid log entry content. + line: String, + /// corresponding table version for the log file. + version: i64, + }, + + /// Error returned when the log contains invalid stats JSON. + #[error("Invalid JSON in file stats: {}", .json_err)] + InvalidStatsJson { + /// JSON error details returned when parsing the stats JSON. + json_err: serde_json::error::Error, + }, + + /// Error returned when the log contains invalid stats JSON. + #[error("Invalid JSON in invariant expression, line=`{line}`, err=`{json_err}`")] + InvalidInvariantJson { + /// JSON error details returned when parsing the invariant expression JSON. + json_err: serde_json::error::Error, + /// Invariant expression. + line: String, + }, + + /// Error returned when the DeltaTable has an invalid version. + #[error("Invalid table version: {0}")] + InvalidVersion(i64), + + /// Error returned when the DeltaTable has no data files. + #[error("Corrupted table, cannot read data file {}: {}", .path, .source)] + MissingDataFile { + /// Source error details returned when the DeltaTable has no data files. + source: std::io::Error, + /// The Path used of the DeltaTable + path: String, + }, + + /// Error returned when the datetime string is invalid for a conversion. + #[error("Invalid datetime string: {}", .source)] + InvalidDateTimeString { + /// Parse error details returned of the datetime string parse error. + #[from] + source: chrono::ParseError, + }, + + /// Error returned when the action record is invalid in log. + #[error("Invalid action record found in log: {}", .source)] + InvalidAction { + /// Action error details returned of the invalid action. + #[from] + source: ProtocolError, + }, + + /// Error returned when attempting to write bad data to the table + #[error("Attempted to write invalid data to the table: {:#?}", violations)] + InvalidData { + /// Action error details returned of the invalid action. + violations: Vec, + }, + + /// Error returned when it is not a DeltaTable. + #[error("Not a Delta table: {0}")] + NotATable(String), + + /// Error returned when no metadata was found in the DeltaTable. + #[error("No metadata found, please make sure table is loaded.")] + NoMetadata, + + /// Error returned when no schema was found in the DeltaTable. + #[error("No schema found, please make sure table is loaded.")] + NoSchema, + + /// Error returned when no partition was found in the DeltaTable. + #[error("No partitions found, please make sure table is partitioned.")] + LoadPartitions, + + /// Error returned when writes are attempted with data that doesn't match the schema of the + /// table + #[error("Data does not match the schema or partitions of the table: {}", msg)] + SchemaMismatch { + /// Information about the mismatch + msg: String, + }, + + /// Error returned when a partition is not formatted as a Hive Partition. + #[error("This partition is not formatted with key=value: {}", .partition)] + PartitionError { + /// The malformed partition used. + partition: String, + }, + + /// Error returned when a invalid partition filter was found. + #[error("Invalid partition filter found: {}.", .partition_filter)] + InvalidPartitionFilter { + /// The invalid partition filter used. + partition_filter: String, + }, + + /// Error returned when a partition filter uses a nonpartitioned column. + #[error("Tried to filter partitions on non-partitioned columns: {:#?}", .nonpartitioned_columns)] + ColumnsNotPartitioned { + /// The columns used in the partition filter that is not partitioned + nonpartitioned_columns: Vec, + }, + + /// Error returned when a line from log record is invalid. + #[error("Failed to read line from log record")] + Io { + /// Source error details returned while reading the log record. + #[from] + source: std::io::Error, + }, + + /// Error raised while commititng transaction + #[error("Transaction failed: {source}")] + Transaction { + /// The source error + source: TransactionError, + }, + + /// Error returned when transaction is failed to be committed because given version already exists. + #[error("Delta transaction failed, version {0} already exists.")] + VersionAlreadyExists(i64), + + /// Error returned when user attempts to commit actions that don't belong to the next version. + #[error("Delta transaction failed, version {0} does not follow {1}")] + VersionMismatch(i64, i64), + + /// A Feature is missing to perform operation + #[error("Delta-rs must be build with feature '{feature}' to support loading from: {url}.")] + MissingFeature { + /// Name of the missing feature + feature: &'static str, + /// Storage location url + url: String, + }, + + /// A Feature is missing to perform operation + #[error("Cannot infer storage location from: {0}")] + InvalidTableLocation(String), + + /// Generic Delta Table error + #[error("Log JSON serialization error: {json_err}")] + SerializeLogJson { + /// JSON serialization error + json_err: serde_json::error::Error, + }, + + /// Generic Delta Table error + #[error("Schema JSON serialization error: {json_err}")] + SerializeSchemaJson { + /// JSON serialization error + json_err: serde_json::error::Error, + }, + + /// Generic Delta Table error + #[error("Generic DeltaTable error: {0}")] + Generic(String), + + /// Generic Delta Table error + #[error("Generic error: {source}")] + GenericError { + /// Source error + source: Box, + }, +} + +impl From for DeltaTableError { + fn from(err: object_store::path::Error) -> Self { + Self::GenericError { + source: Box::new(err), + } + } +} + +/// Error related to Delta log application +#[derive(thiserror::Error, Debug)] +pub enum ApplyLogError { + /// Error returned when the end of transaction log is reached. + #[error("End of transaction log")] + EndOfLog, + + /// Error returned when the JSON of the log record is invalid. + #[error("Invalid JSON found when applying log record")] + InvalidJson { + /// JSON error details returned when reading the JSON log record. + #[from] + source: serde_json::error::Error, + }, + + /// Error returned when the storage failed to read the log content. + #[error("Failed to read log content")] + Storage { + /// Storage error details returned while reading the log content. + source: ObjectStoreError, + }, + + /// Error returned when reading delta config failed. + #[error("Failed to read delta config: {}", .source)] + Config { + /// Delta config error returned when reading delta config failed. + #[from] + source: DeltaConfigError, + }, + + /// Error returned when a line from log record is invalid. + #[error("Failed to read line from log record")] + Io { + /// Source error details returned while reading the log record. + #[from] + source: std::io::Error, + }, + + /// Error returned when the action record is invalid in log. + #[error("Invalid action record found in log: {}", .source)] + InvalidAction { + /// Action error details returned of the invalid action. + #[from] + source: ProtocolError, + }, +} + +impl From for ApplyLogError { + fn from(error: ObjectStoreError) -> Self { + match error { + ObjectStoreError::NotFound { .. } => ApplyLogError::EndOfLog, + _ => ApplyLogError::Storage { source: error }, + } + } +} + +/// Error related to checkpoint loading +#[derive(thiserror::Error, Debug)] +pub enum LoadCheckpointError { + /// Error returned when the JSON checkpoint is not found. + #[error("Checkpoint file not found")] + NotFound, + /// Error returned when the JSON checkpoint is invalid. + #[error("Invalid JSON in checkpoint: {source}")] + InvalidJson { + /// Error details returned while reading the JSON. + #[from] + source: serde_json::error::Error, + }, + /// Error returned when it failed to read the checkpoint content. + #[error("Failed to read checkpoint content: {source}")] + Storage { + /// Storage error details returned while reading the checkpoint content. + source: ObjectStoreError, + }, +} + +impl From for LoadCheckpointError { + fn from(error: ObjectStoreError) -> Self { + match error { + ObjectStoreError::NotFound { .. } => LoadCheckpointError::NotFound, + _ => LoadCheckpointError::Storage { source: error }, + } + } +} diff --git a/rust/src/lib.rs b/rust/src/lib.rs index ae6ea740df..5323c34eff 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -87,6 +87,7 @@ pub mod builder; pub mod data_catalog; pub mod delta; pub mod delta_config; +pub mod errors; pub mod operations; pub mod partitions; pub mod schema; @@ -97,8 +98,6 @@ pub mod time_utils; #[cfg(all(feature = "arrow"))] pub mod table_state_arrow; -#[cfg(all(feature = "arrow", feature = "parquet"))] -pub mod checkpoints; #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod delta_arrow; #[cfg(feature = "datafusion")] @@ -112,10 +111,13 @@ pub use self::delta::*; pub use self::delta_config::*; pub use self::partitions::*; pub use self::schema::*; +pub use errors::*; pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore}; pub use operations::DeltaOps; // convenience exports for consumers to avoid aligning crate versions +#[cfg(all(feature = "arrow", feature = "parquet"))] +pub use action::checkpoints; #[cfg(feature = "arrow")] pub use arrow; #[cfg(feature = "datafusion")] diff --git a/rust/src/operations/create.rs b/rust/src/operations/create.rs index 2c6a310f85..697ab3ef1d 100644 --- a/rust/src/operations/create.rs +++ b/rust/src/operations/create.rs @@ -1,19 +1,21 @@ //! Command for creating a new delta table // https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala + +use std::collections::HashMap; +use std::sync::Arc; + +use futures::future::BoxFuture; +use serde_json::{Map, Value}; + use super::transaction::commit; use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION}; use crate::action::{Action, DeltaOperation, MetaData, Protocol, SaveMode}; use crate::builder::ensure_table_uri; use crate::delta_config::DeltaConfigKey; +use crate::errors::{DeltaResult, DeltaTableError}; use crate::schema::{SchemaDataType, SchemaField, SchemaTypeStruct}; use crate::storage::DeltaObjectStore; -use crate::{DeltaResult, DeltaTable, DeltaTableError}; -use crate::{DeltaTableBuilder, DeltaTableMetaData}; - -use futures::future::BoxFuture; -use serde_json::{Map, Value}; -use std::collections::HashMap; -use std::sync::Arc; +use crate::{DeltaTable, DeltaTableBuilder, DeltaTableMetaData}; #[derive(thiserror::Error, Debug)] enum CreateError { diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index 2a12985ac2..4bca7778e5 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -17,18 +17,10 @@ //! .await?; //! ```` -use crate::action::DeltaOperation; -use crate::delta::DeltaResult; -use crate::delta_datafusion::parquet_scan_from_actions; -use crate::delta_datafusion::partitioned_file_from_action; -use crate::delta_datafusion::register_store; -use crate::operations::transaction::commit; -use crate::operations::write::write_execution_plan; -use crate::storage::DeltaObjectStore; -use crate::storage::ObjectStoreRef; -use crate::table_state::DeltaTableState; -use crate::DeltaTable; -use crate::DeltaTableError; +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::Arc; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; use crate::action::{Action, Add, Remove}; use arrow::array::StringArray; @@ -59,10 +51,17 @@ use futures::stream::StreamExt; use parquet::file::properties::WriterProperties; use serde_json::Map; use serde_json::Value; -use std::collections::HashMap; -use std::pin::Pin; -use std::sync::Arc; -use std::time::{Instant, SystemTime, UNIX_EPOCH}; + +use crate::action::DeltaOperation; +use crate::delta_datafusion::{ + parquet_scan_from_actions, partitioned_file_from_action, register_store, +}; +use crate::errors::{DeltaResult, DeltaTableError}; +use crate::operations::transaction::commit; +use crate::operations::write::write_execution_plan; +use crate::storage::{DeltaObjectStore, ObjectStoreRef}; +use crate::table_state::DeltaTableState; +use crate::DeltaTable; const PATH_COLUMN: &str = "__delta_rs_path"; diff --git a/rust/src/operations/filesystem_check.rs b/rust/src/operations/filesystem_check.rs index 0448de228c..18a387d3db 100644 --- a/rust/src/operations/filesystem_check.rs +++ b/rust/src/operations/filesystem_check.rs @@ -11,22 +11,26 @@ //! let mut table = open_table("../path/to/table")?; //! let (table, metrics) = FileSystemCheckBuilder::new(table.object_store(), table.state).await?; //! ```` -use crate::action::{Action, Add, DeltaOperation, Remove}; -use crate::operations::transaction::commit; -use crate::storage::DeltaObjectStore; -use crate::table_state::DeltaTableState; -use crate::{DeltaResult, DeltaTable, DeltaTableError}; -use futures::future::BoxFuture; -use futures::StreamExt; -pub use object_store::path::Path; -use object_store::ObjectStore; + use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; use std::time::SystemTime; use std::time::UNIX_EPOCH; + +use futures::future::BoxFuture; +use futures::StreamExt; +pub use object_store::path::Path; +use object_store::ObjectStore; use url::{ParseError, Url}; +use crate::action::{Action, Add, DeltaOperation, Remove}; +use crate::errors::{DeltaResult, DeltaTableError}; +use crate::operations::transaction::commit; +use crate::storage::DeltaObjectStore; +use crate::table_state::DeltaTableState; +use crate::DeltaTable; + /// Audit the Delta Table's active files with the underlying file system. /// See this module's documentaiton for more information #[derive(Debug)] diff --git a/rust/src/operations/load.rs b/rust/src/operations/load.rs index 2be717e549..9501c18011 100644 --- a/rust/src/operations/load.rs +++ b/rust/src/operations/load.rs @@ -6,9 +6,10 @@ use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use futures::future::BoxFuture; +use crate::errors::{DeltaResult, DeltaTableError}; use crate::storage::DeltaObjectStore; use crate::table_state::DeltaTableState; -use crate::{DeltaResult, DeltaTable, DeltaTableError}; +use crate::DeltaTable; #[derive(Debug, Clone)] pub struct LoadBuilder { diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index a7da3a2d1d..786f455344 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -11,7 +11,8 @@ use self::create::CreateBuilder; use self::filesystem_check::FileSystemCheckBuilder; use self::vacuum::VacuumBuilder; use crate::builder::DeltaTableBuilder; -use crate::{DeltaResult, DeltaTable, DeltaTableError}; +use crate::errors::{DeltaResult, DeltaTableError}; +use crate::DeltaTable; pub mod create; pub mod filesystem_check; diff --git a/rust/src/operations/optimize.rs b/rust/src/operations/optimize.rs index 2a902cba25..493abfb03b 100644 --- a/rust/src/operations/optimize.rs +++ b/rust/src/operations/optimize.rs @@ -20,14 +20,10 @@ //! let (table, metrics) = OptimizeBuilder::new(table.object_store(), table.state).await?; //! ```` -use super::transaction::commit; -use super::writer::{PartitionWriter, PartitionWriterConfig}; -use crate::action::{self, Action, DeltaOperation}; -use crate::crate_version; -use crate::storage::ObjectStoreRef; -use crate::table_state::DeltaTableState; -use crate::writer::utils::arrow_schema_without_partitions; -use crate::{DeltaResult, DeltaTable, DeltaTableError, ObjectMeta, PartitionFilter}; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use futures::future::BoxFuture; use futures::{StreamExt, TryStreamExt}; @@ -38,9 +34,15 @@ use parquet::basic::{Compression, ZstdLevel}; use parquet::file::properties::WriterProperties; use serde::{Deserialize, Serialize}; use serde_json::Map; -use std::collections::HashMap; -use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; + +use super::transaction::commit; +use super::writer::{PartitionWriter, PartitionWriterConfig}; +use crate::action::{self, Action, DeltaOperation}; +use crate::errors::{DeltaResult, DeltaTableError}; +use crate::storage::ObjectStoreRef; +use crate::table_state::DeltaTableState; +use crate::writer::utils::arrow_schema_without_partitions; +use crate::{crate_version, DeltaTable, ObjectMeta, PartitionFilter}; /// Metrics from Optimize #[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)] diff --git a/rust/src/operations/transaction/conflict_checker.rs b/rust/src/operations/transaction/conflict_checker.rs index dd27a71cd2..1058400787 100644 --- a/rust/src/operations/transaction/conflict_checker.rs +++ b/rust/src/operations/transaction/conflict_checker.rs @@ -7,8 +7,9 @@ use object_store::ObjectStore; use super::CommitInfo; use crate::action::{Action, Add, DeltaOperation, MetaData, Protocol, Remove}; use crate::delta_config::IsolationLevel; +use crate::errors::{DeltaResult, DeltaTableError}; use crate::storage::commit_uri_from_version; -use crate::{table_state::DeltaTableState, DeltaResult, DeltaTableError}; +use crate::table_state::DeltaTableState; #[cfg(feature = "datafusion")] use super::state::AddContainer; diff --git a/rust/src/operations/transaction/mod.rs b/rust/src/operations/transaction/mod.rs index cae0f9e610..ba285bd046 100644 --- a/rust/src/operations/transaction/mod.rs +++ b/rust/src/operations/transaction/mod.rs @@ -6,9 +6,10 @@ use object_store::{Error as ObjectStoreError, ObjectStore}; use serde_json::{Map, Value}; use crate::action::{Action, CommitInfo, DeltaOperation}; +use crate::crate_version; +use crate::errors::{DeltaResult, DeltaTableError}; use crate::storage::commit_uri_from_version; use crate::table_state::DeltaTableState; -use crate::{crate_version, DeltaResult, DeltaTableError}; mod conflict_checker; #[cfg(feature = "datafusion")] diff --git a/rust/src/operations/transaction/state.rs b/rust/src/operations/transaction/state.rs index 397601c547..cb1b336c3c 100644 --- a/rust/src/operations/transaction/state.rs +++ b/rust/src/operations/transaction/state.rs @@ -23,9 +23,8 @@ use crate::action::Add; use crate::delta_datafusion::{ get_null_of_arrow_type, logical_expr_to_physical_expr, to_correct_scalar_value, }; +use crate::errors::{DeltaResult, DeltaTableError}; use crate::table_state::DeltaTableState; -use crate::DeltaResult; -use crate::DeltaTableError; impl DeltaTableState { /// Get the table schema as an [`ArrowSchemaRef`] diff --git a/rust/src/operations/vacuum.rs b/rust/src/operations/vacuum.rs index f381d282b1..035f6f7c53 100644 --- a/rust/src/operations/vacuum.rs +++ b/rust/src/operations/vacuum.rs @@ -21,16 +21,19 @@ //! let (table, metrics) = VacuumBuilder::new(table.object_store(). table.state).await?; //! ```` -use crate::storage::DeltaObjectStore; -use crate::table_state::DeltaTableState; -use crate::{DeltaResult, DeltaTable, DeltaTableError}; +use std::collections::HashSet; +use std::fmt::Debug; +use std::sync::Arc; + use chrono::{Duration, Utc}; use futures::future::BoxFuture; use futures::{StreamExt, TryStreamExt}; use object_store::{path::Path, ObjectStore}; -use std::collections::HashSet; -use std::fmt::Debug; -use std::sync::Arc; + +use crate::errors::{DeltaResult, DeltaTableError}; +use crate::storage::DeltaObjectStore; +use crate::table_state::DeltaTableState; +use crate::DeltaTable; /// Errors that can occur during vacuum #[derive(thiserror::Error, Debug)] diff --git a/rust/src/operations/write.rs b/rust/src/operations/write.rs index 1ab9a05307..f70adcd120 100644 --- a/rust/src/operations/write.rs +++ b/rust/src/operations/write.rs @@ -19,27 +19,28 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; +use arrow_array::RecordBatch; +use arrow_cast::{can_cast_types, cast}; +use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; +use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; +use datafusion::physical_plan::{memory::MemoryExec, ExecutionPlan}; +use futures::future::BoxFuture; +use futures::StreamExt; +use parquet::file::properties::WriterProperties; + use super::writer::{DeltaWriter, WriterConfig}; use super::MAX_SUPPORTED_WRITER_VERSION; use super::{transaction::commit, CreateBuilder}; use crate::action::{Action, Add, DeltaOperation, Remove, SaveMode}; -use crate::delta::{DeltaResult, DeltaTable, DeltaTableError}; +use crate::delta::DeltaTable; use crate::delta_datafusion::DeltaDataChecker; +use crate::errors::{DeltaResult, DeltaTableError}; use crate::schema::Schema; use crate::storage::{DeltaObjectStore, ObjectStoreRef}; use crate::table_state::DeltaTableState; use crate::writer::record_batch::divide_by_partition_values; use crate::writer::utils::PartitionPath; -use arrow_array::RecordBatch; -use arrow_cast::{can_cast_types, cast}; -use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; -use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; -use datafusion::physical_plan::{memory::MemoryExec, ExecutionPlan}; -use futures::future::BoxFuture; -use futures::StreamExt; -use parquet::file::properties::WriterProperties; - #[derive(thiserror::Error, Debug)] enum WriteError { #[error("No data source supplied to write command.")] diff --git a/rust/src/operations/writer.rs b/rust/src/operations/writer.rs index b6134386c1..893fac7a0e 100644 --- a/rust/src/operations/writer.rs +++ b/rust/src/operations/writer.rs @@ -2,16 +2,6 @@ use std::collections::HashMap; -use crate::action::Add; -use crate::storage::ObjectStoreRef; -use crate::writer::record_batch::{divide_by_partition_values, PartitionResult}; -use crate::writer::stats::create_add; -use crate::writer::utils::{ - arrow_schema_without_partitions, record_batch_without_partitions, PartitionPath, - ShareableBuffer, -}; -use crate::{crate_version, DeltaResult, DeltaTableError}; - use arrow::datatypes::SchemaRef as ArrowSchemaRef; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; @@ -21,6 +11,17 @@ use parquet::arrow::ArrowWriter; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; +use crate::action::Add; +use crate::crate_version; +use crate::errors::{DeltaResult, DeltaTableError}; +use crate::storage::ObjectStoreRef; +use crate::writer::record_batch::{divide_by_partition_values, PartitionResult}; +use crate::writer::stats::create_add; +use crate::writer::utils::{ + arrow_schema_without_partitions, record_batch_without_partitions, PartitionPath, + ShareableBuffer, +}; + // TODO databricks often suggests a file size of 100mb, should we set this default? const DEFAULT_TARGET_FILE_SIZE: usize = 104_857_600; const DEFAULT_WRITE_BATCH_SIZE: usize = 1024; diff --git a/rust/src/partitions.rs b/rust/src/partitions.rs index 47f8a5cd61..ed4a5a2eaf 100644 --- a/rust/src/partitions.rs +++ b/rust/src/partitions.rs @@ -3,7 +3,7 @@ use std::convert::TryFrom; use super::schema::SchemaDataType; -use crate::DeltaTableError; +use crate::errors::DeltaTableError; use std::cmp::Ordering; use std::collections::HashMap; diff --git a/rust/src/schema.rs b/rust/src/schema.rs index cd3022fd90..2602c5cd68 100644 --- a/rust/src/schema.rs +++ b/rust/src/schema.rs @@ -6,7 +6,7 @@ use serde_json::Value; use std::borrow::Cow; use std::collections::HashMap; -use crate::DeltaTableError; +use crate::errors::DeltaTableError; /// Type alias for a string expected to match a GUID/UUID format pub type Guid = String; diff --git a/rust/src/storage/config.rs b/rust/src/storage/config.rs index de03ed009d..873f29dd1b 100644 --- a/rust/src/storage/config.rs +++ b/rust/src/storage/config.rs @@ -1,16 +1,18 @@ //! Configuration handling for defining Storage backends for DeltaTables. -use super::file::FileStorageBackend; -use super::utils::str_is_truthy; -use crate::{DeltaResult, DeltaTableError}; +use std::collections::HashMap; +use std::sync::Arc; + use object_store::memory::InMemory; use object_store::path::Path; use object_store::prefix::PrefixStore; use object_store::{DynObjectStore, ObjectStore}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::sync::Arc; use url::Url; +use super::file::FileStorageBackend; +use super::utils::str_is_truthy; +use crate::errors::{DeltaResult, DeltaTableError}; + #[cfg(any(feature = "s3", feature = "s3-native-tls"))] use super::s3::{S3StorageBackend, S3StorageOptions}; #[cfg(feature = "hdfs")] diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index 422ecedadf..20ce1bc97d 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -1,11 +1,9 @@ //! Object storage backend abstraction layer for Delta Table transaction logs and data -pub mod config; -pub mod file; -pub mod utils; - -use self::config::StorageOptions; -use crate::DeltaResult; +use std::collections::HashMap; +use std::fmt; +use std::ops::Range; +use std::sync::Arc; use bytes::Bytes; use futures::{stream::BoxStream, StreamExt}; @@ -13,13 +11,16 @@ use lazy_static::lazy_static; use serde::de::{Error, SeqAccess, Visitor}; use serde::ser::SerializeSeq; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use std::collections::HashMap; -use std::fmt; -use std::ops::Range; -use std::sync::Arc; use tokio::io::AsyncWrite; use url::Url; +use self::config::StorageOptions; +use crate::errors::DeltaResult; + +pub mod config; +pub mod file; +pub mod utils; + #[cfg(any(feature = "s3", feature = "s3-native-tls"))] pub mod s3; diff --git a/rust/src/storage/utils.rs b/rust/src/storage/utils.rs index 501c4bdd63..edfe5ddd95 100644 --- a/rust/src/storage/utils.rs +++ b/rust/src/storage/utils.rs @@ -1,15 +1,16 @@ //! Utility functions for working across Delta tables use std::collections::HashMap; +use std::sync::Arc; -use crate::action::Add; -use crate::builder::DeltaTableBuilder; -use crate::{DeltaResult, DeltaTableError}; use chrono::{DateTime, NaiveDateTime, Utc}; use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::{DynObjectStore, ObjectMeta, Result as ObjectStoreResult}; -use std::sync::Arc; + +use crate::action::Add; +use crate::builder::DeltaTableBuilder; +use crate::errors::{DeltaResult, DeltaTableError}; /// Copies the contents from the `from` location into the `to` location pub async fn copy_table( @@ -82,7 +83,7 @@ impl TryFrom<&Add> for ObjectMeta { let last_modified = DateTime::::from_utc( NaiveDateTime::from_timestamp_millis(value.modification_time).ok_or( DeltaTableError::InvalidAction { - source: crate::action::ActionError::InvalidField(format!( + source: crate::action::ProtocolError::InvalidField(format!( "invalid modification_time: {:?}", value.modification_time )), diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 631b41781e..1d76a4bada 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -1,20 +1,23 @@ //! The module for delta table state. +use std::collections::HashMap; +use std::collections::HashSet; +use std::convert::TryFrom; +use std::io::{BufRead, BufReader, Cursor}; + +use chrono::Utc; +use lazy_static::lazy_static; +use object_store::{path::Path, ObjectStore}; +use serde::{Deserialize, Serialize}; + use crate::action::{self, Action, Add}; use crate::delta_config::TableConfig; +use crate::errors::{ApplyLogError, DeltaTableError}; use crate::partitions::{DeltaTablePartition, PartitionFilter}; use crate::schema::SchemaDataType; use crate::storage::commit_uri_from_version; use crate::Schema; -use crate::{ApplyLogError, DeltaTable, DeltaTableError, DeltaTableMetaData}; -use chrono::Utc; -use lazy_static::lazy_static; -use object_store::{path::Path, ObjectStore}; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::collections::HashSet; -use std::convert::TryFrom; -use std::io::{BufRead, BufReader, Cursor}; +use crate::{DeltaTable, DeltaTableMetaData}; #[cfg(any(feature = "parquet", feature = "parquet2"))] use super::{CheckPoint, DeltaTableConfig}; @@ -100,7 +103,7 @@ impl DeltaTableState { let preader = SerializedFileReader::new(data)?; let schema = preader.metadata().file_metadata().schema(); if !schema.is_group() { - return Err(DeltaTableError::from(action::ActionError::Generic( + return Err(DeltaTableError::from(action::ProtocolError::Generic( "Action record in checkpoint should be a struct".to_string(), ))); } @@ -123,7 +126,7 @@ impl DeltaTableState { for row_group in metadata.row_groups { for action in actions_from_row_group(row_group, &mut reader) - .map_err(action::ActionError::from)? + .map_err(action::ProtocolError::from)? { self.process_action( action, diff --git a/rust/src/table_state_arrow.rs b/rust/src/table_state_arrow.rs index 67a37edad2..48cf8c2de2 100644 --- a/rust/src/table_state_arrow.rs +++ b/rust/src/table_state_arrow.rs @@ -2,22 +2,25 @@ //! //! See [crate::table_state::DeltaTableState]. -use crate::action::{ColumnCountStat, ColumnValueStat, Stats}; -use crate::table_state::DeltaTableState; -use crate::DeltaTableError; -use crate::SchemaDataType; -use crate::SchemaTypeStruct; -use arrow::array::{ +use std::borrow::Cow; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::sync::Arc; + +use arrow::compute::cast; +use arrow::compute::kernels::cast_utils::Parser; +use arrow_array::types::{Date32Type, TimestampMicrosecondType}; +use arrow_array::{ Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Float64Array, Int64Array, NullArray, StringArray, StructArray, TimestampMicrosecondArray, TimestampMillisecondArray, }; -use arrow::compute::cast; -use arrow::compute::kernels::cast_utils::Parser; -use arrow::datatypes::{DataType, Date32Type, Field, Fields, TimeUnit, TimestampMicrosecondType}; +use arrow_schema::{DataType, Field, Fields, TimeUnit}; use itertools::Itertools; -use std::borrow::Cow; -use std::collections::{HashMap, HashSet, VecDeque}; -use std::sync::Arc; + +use crate::action::{ColumnCountStat, ColumnValueStat, Stats}; +use crate::errors::DeltaTableError; +use crate::table_state::DeltaTableState; +use crate::SchemaDataType; +use crate::SchemaTypeStruct; impl DeltaTableState { /// Get an [arrow::record_batch::RecordBatch] containing add action data. diff --git a/rust/src/writer/json.rs b/rust/src/writer/json.rs index c601f7c1e4..b8b4e48713 100644 --- a/rust/src/writer/json.rs +++ b/rust/src/writer/json.rs @@ -3,16 +3,6 @@ use std::collections::HashMap; use std::convert::TryFrom; use std::sync::Arc; -use super::stats::create_add; -use super::utils::{ - arrow_schema_without_partitions, next_data_path, record_batch_from_message, - record_batch_without_partitions, stringified_partition_value, -}; -use super::{DeltaWriter, DeltaWriterError}; -use crate::builder::DeltaTableBuilder; -use crate::{action::Add, DeltaTable, DeltaTableError, DeltaTableMetaData, Schema}; -use crate::{storage::DeltaObjectStore, writer::utils::ShareableBuffer}; - use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use arrow::record_batch::*; use bytes::Bytes; @@ -24,6 +14,17 @@ use parquet::{ }; use serde_json::Value; +use super::stats::create_add; +use super::utils::{ + arrow_schema_without_partitions, next_data_path, record_batch_from_message, + record_batch_without_partitions, stringified_partition_value, +}; +use super::{DeltaWriter, DeltaWriterError}; +use crate::builder::DeltaTableBuilder; +use crate::errors::DeltaTableError; +use crate::{action::Add, DeltaTable, DeltaTableMetaData, Schema}; +use crate::{storage::DeltaObjectStore, writer::utils::ShareableBuffer}; + type BadValue = (Value, ParquetError); /// Writes messages to a delta lake table. diff --git a/rust/src/writer/mod.rs b/rust/src/writer/mod.rs index 2dfe5a3822..e7dc5a38c5 100644 --- a/rust/src/writer/mod.rs +++ b/rust/src/writer/mod.rs @@ -1,15 +1,16 @@ #![cfg(all(feature = "arrow", feature = "parquet"))] //! Abstractions and implementations for writing data to delta tables -use crate::action::{Action, Add, ColumnCountStat}; -use crate::{DeltaTable, DeltaTableError}; - use arrow::{datatypes::SchemaRef, error::ArrowError}; use async_trait::async_trait; use object_store::Error as ObjectStoreError; use parquet::errors::ParquetError; use serde_json::Value; +use crate::action::{Action, Add, ColumnCountStat}; +use crate::errors::DeltaTableError; +use crate::DeltaTable; + pub use json::JsonWriter; pub use record_batch::RecordBatchWriter; diff --git a/rust/src/writer/record_batch.rs b/rust/src/writer/record_batch.rs index 4f2789cd32..78d491fae0 100644 --- a/rust/src/writer/record_batch.rs +++ b/rust/src/writer/record_batch.rs @@ -28,18 +28,6 @@ //! ``` use std::{collections::HashMap, sync::Arc}; -use super::{ - stats::create_add, - utils::{ - arrow_schema_without_partitions, next_data_path, record_batch_without_partitions, - stringified_partition_value, PartitionPath, - }, - DeltaWriter, DeltaWriterError, -}; -use crate::builder::DeltaTableBuilder; -use crate::writer::utils::ShareableBuffer; -use crate::DeltaTableError; -use crate::{action::Add, storage::DeltaObjectStore, DeltaTable, DeltaTableMetaData, Schema}; use arrow::array::{Array, UInt32Array}; use arrow::compute::{lexicographical_partition_ranges, take, SortColumn}; use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; @@ -52,6 +40,16 @@ use object_store::ObjectStore; use parquet::{arrow::ArrowWriter, errors::ParquetError}; use parquet::{basic::Compression, file::properties::WriterProperties}; +use super::stats::create_add; +use super::utils::{ + arrow_schema_without_partitions, next_data_path, record_batch_without_partitions, + stringified_partition_value, PartitionPath, ShareableBuffer, +}; +use super::{DeltaWriter, DeltaWriterError}; +use crate::builder::DeltaTableBuilder; +use crate::errors::DeltaTableError; +use crate::{action::Add, storage::DeltaObjectStore, DeltaTable, DeltaTableMetaData, Schema}; + /// Writes messages to a delta lake table. pub struct RecordBatchWriter { storage: Arc, diff --git a/rust/src/writer/stats.rs b/rust/src/writer/stats.rs index faef0ee670..2e0878c6d6 100644 --- a/rust/src/writer/stats.rs +++ b/rust/src/writer/stats.rs @@ -1,5 +1,7 @@ -use super::*; -use crate::action::{Add, ColumnValueStat, Stats}; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use std::{collections::HashMap, ops::AddAssign}; + use parquet::format::FileMetaData; use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; use parquet::{basic::LogicalType, errors::ParquetError}; @@ -7,9 +9,9 @@ use parquet::{ file::{metadata::RowGroupMetaData, statistics::Statistics}, format::TimeUnit, }; -use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; -use std::{collections::HashMap, ops::AddAssign}; + +use super::*; +use crate::action::{Add, ColumnValueStat, Stats}; pub(crate) fn create_add( partition_values: &HashMap>, @@ -455,7 +457,8 @@ mod tests { use crate::{ action::{ColumnCountStat, ColumnValueStat}, builder::DeltaTableBuilder, - DeltaTable, DeltaTableError, + errors::DeltaTableError, + DeltaTable, }; use lazy_static::lazy_static; use parquet::data_type::{ByteArray, FixedLenByteArray}; diff --git a/rust/src/writer/utils.rs b/rust/src/writer/utils.rs index c37bb687f1..e21e1dfd1f 100644 --- a/rust/src/writer/utils.rs +++ b/rust/src/writer/utils.rs @@ -4,9 +4,6 @@ use std::fmt::Display; use std::io::Write; use std::sync::Arc; -use crate::writer::DeltaWriterError; -use crate::DeltaResult; - use arrow::array::{ as_boolean_array, as_generic_binary_array, as_primitive_array, as_string_array, Array, }; @@ -23,6 +20,9 @@ use parking_lot::RwLock; use serde_json::Value; use uuid::Uuid; +use crate::errors::DeltaResult; +use crate::writer::DeltaWriterError; + const NULL_PARTITION_VALUE_DATA_PATH: &str = "__HIVE_DEFAULT_PARTITION__"; const PARTITION_DATE_FORMAT: &str = "%Y-%m-%d"; const PARTITION_DATETIME_FORMAT: &str = "%Y-%m-%d %H:%M:%S"; diff --git a/rust/tests/command_filesystem_check.rs b/rust/tests/command_filesystem_check.rs index b39ac39cac..1b3fec85fb 100644 --- a/rust/tests/command_filesystem_check.rs +++ b/rust/tests/command_filesystem_check.rs @@ -4,7 +4,7 @@ use deltalake::test_utils::{ set_env_if_not_set, IntegrationContext, StorageIntegration, TestResult, TestTables, }; use deltalake::Path; -use deltalake::{DeltaOps, DeltaTableError}; +use deltalake::{errors::DeltaTableError, DeltaOps}; use serial_test::serial; mod common; diff --git a/rust/tests/command_optimize.rs b/rust/tests/command_optimize.rs index aa0263e940..7d03d663a4 100644 --- a/rust/tests/command_optimize.rs +++ b/rust/tests/command_optimize.rs @@ -1,5 +1,9 @@ #![cfg(all(feature = "arrow", feature = "parquet"))] +use std::time::SystemTime; +use std::time::UNIX_EPOCH; +use std::{collections::HashMap, error::Error, sync::Arc}; + use arrow::datatypes::Schema as ArrowSchema; use arrow::{ array::{Int32Array, StringArray}, @@ -7,16 +11,14 @@ use arrow::{ record_batch::RecordBatch, }; use deltalake::action::{Action, Remove}; +use deltalake::errors::DeltaTableError; use deltalake::operations::optimize::{create_merge_plan, MetricDetails, Metrics}; use deltalake::operations::DeltaOps; use deltalake::writer::{DeltaWriter, RecordBatchWriter}; -use deltalake::{DeltaTable, DeltaTableError, PartitionFilter, SchemaDataType, SchemaField}; +use deltalake::{DeltaTable, PartitionFilter, SchemaDataType, SchemaField}; use parquet::file::properties::WriterProperties; use rand::prelude::*; use serde_json::json; -use std::time::SystemTime; -use std::time::UNIX_EPOCH; -use std::{collections::HashMap, error::Error, sync::Arc}; use tempdir::TempDir; struct Context { diff --git a/rust/tests/integration_checkpoint.rs b/rust/tests/integration_checkpoint.rs index e5e101367d..c4361ac7bf 100644 --- a/rust/tests/integration_checkpoint.rs +++ b/rust/tests/integration_checkpoint.rs @@ -4,7 +4,7 @@ use chrono::Utc; use deltalake::checkpoints::{cleanup_expired_logs_for, create_checkpoint}; use deltalake::test_utils::{IntegrationContext, StorageIntegration, TestResult}; use deltalake::writer::{DeltaWriter, JsonWriter}; -use deltalake::{DeltaOps, DeltaResult, DeltaTableBuilder, ObjectStore, SchemaDataType}; +use deltalake::{errors::DeltaResult, DeltaOps, DeltaTableBuilder, ObjectStore, SchemaDataType}; use object_store::path::Path; use serde_json::json; use serial_test::serial; diff --git a/rust/tests/integration_commit.rs b/rust/tests/integration_commit.rs index 368e72f1b9..e8f6e8bd91 100644 --- a/rust/tests/integration_commit.rs +++ b/rust/tests/integration_commit.rs @@ -4,7 +4,7 @@ mod fs_common; use deltalake::test_utils::{IntegrationContext, StorageIntegration, TestResult, TestTables}; -use deltalake::{action, DeltaTableBuilder, DeltaTableError}; +use deltalake::{action, errors::DeltaTableError, DeltaTableBuilder}; use serial_test::serial; use std::collections::HashMap; @@ -151,7 +151,7 @@ mod simple_commit_fs { let result = table.try_commit_transaction(&commit, 1).await; match result { - Err(deltalake::DeltaTableError::VersionAlreadyExists(_)) => { + Err(DeltaTableError::VersionAlreadyExists(_)) => { assert!(true, "Delta version already exists."); } _ => { diff --git a/rust/tests/read_delta_partitions_test.rs b/rust/tests/read_delta_partitions_test.rs index d0142f9460..522c06646d 100644 --- a/rust/tests/read_delta_partitions_test.rs +++ b/rust/tests/read_delta_partitions_test.rs @@ -1,9 +1,8 @@ -extern crate deltalake; - -use deltalake::schema::SchemaDataType; use std::collections::HashMap; use std::convert::TryFrom; +use deltalake::schema::SchemaDataType; + #[allow(dead_code)] mod fs_common; @@ -22,7 +21,7 @@ fn test_create_delta_table_partition() { let _wrong_path = "year=2021/month="; assert!(matches!( deltalake::DeltaTablePartition::try_from(_wrong_path).unwrap_err(), - deltalake::DeltaTableError::PartitionError { + deltalake::errors::DeltaTableError::PartitionError { partition: _wrong_path }, )) diff --git a/rust/tests/read_delta_test.rs b/rust/tests/read_delta_test.rs index ff30294ff7..dc70d1fd2d 100644 --- a/rust/tests/read_delta_test.rs +++ b/rust/tests/read_delta_test.rs @@ -563,7 +563,7 @@ async fn read_empty_folder() { assert!(matches!( result.unwrap_err(), - deltalake::DeltaTableError::NotATable(_), + deltalake::errors::DeltaTableError::NotATable(_), )); let dir = std::env::temp_dir(); @@ -575,7 +575,7 @@ async fn read_empty_folder() { assert!(matches!( result.unwrap_err(), - deltalake::DeltaTableError::NotATable(_), + deltalake::errors::DeltaTableError::NotATable(_), )); }