From 3ed986a8f95f6246b7f516a772452cecf8150e12 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sun, 22 Oct 2023 18:45:00 +0200 Subject: [PATCH] chore: copy over kernel types --- rust/src/kernel/actions/arrow.rs | 275 +++++++++++++ rust/src/kernel/actions/mod.rs | 620 +++++++++++++++++++++++++++++ rust/src/kernel/actions/schemas.rs | 252 ++++++++++++ rust/src/kernel/actions/types.rs | 462 +++++++++++++++++++++ rust/src/kernel/error.rs | 61 +++ rust/src/kernel/mod.rs | 5 + rust/src/kernel/schema.rs | 554 ++++++++++++++++++++++++++ 7 files changed, 2229 insertions(+) create mode 100644 rust/src/kernel/actions/arrow.rs create mode 100644 rust/src/kernel/actions/mod.rs create mode 100644 rust/src/kernel/actions/schemas.rs create mode 100644 rust/src/kernel/actions/types.rs create mode 100644 rust/src/kernel/error.rs create mode 100644 rust/src/kernel/mod.rs create mode 100644 rust/src/kernel/schema.rs diff --git a/rust/src/kernel/actions/arrow.rs b/rust/src/kernel/actions/arrow.rs new file mode 100644 index 0000000000..5408eb8ccd --- /dev/null +++ b/rust/src/kernel/actions/arrow.rs @@ -0,0 +1,275 @@ +use std::sync::Arc; + +use arrow_schema::{ + ArrowError, DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, + SchemaRef as ArrowSchemaRef, TimeUnit, +}; + +use crate::schema::{ArrayType, DataType, MapType, PrimitiveType, StructField, StructType}; + +impl TryFrom<&StructType> for ArrowSchema { + type Error = ArrowError; + + fn try_from(s: &StructType) -> Result { + let fields = s + .fields() + .iter() + .map(|f| >::try_from(*f)) + .collect::, ArrowError>>()?; + + Ok(ArrowSchema::new(fields)) + } +} + +impl TryFrom<&StructField> for ArrowField { + type Error = ArrowError; + + fn try_from(f: &StructField) -> Result { + let metadata = f + .metadata() + .iter() + .map(|(key, val)| Ok((key.clone(), serde_json::to_string(val)?))) + .collect::>() + .map_err(|err| ArrowError::JsonError(err.to_string()))?; + + let field = ArrowField::new( + f.name(), + ArrowDataType::try_from(f.data_type())?, + f.is_nullable(), + ) + .with_metadata(metadata); + + Ok(field) + } +} + +impl TryFrom<&ArrayType> for ArrowField { + type Error = ArrowError; + + fn try_from(a: &ArrayType) -> Result { + Ok(ArrowField::new( + "item", + ArrowDataType::try_from(a.element_type())?, + a.contains_null(), + )) + } +} + +impl TryFrom<&MapType> for ArrowField { + type Error = ArrowError; + + fn try_from(a: &MapType) -> Result { + Ok(ArrowField::new( + "entries", + ArrowDataType::Struct( + vec![ + ArrowField::new("key", ArrowDataType::try_from(a.key_type())?, false), + ArrowField::new( + "value", + ArrowDataType::try_from(a.value_type())?, + a.value_contains_null(), + ), + ] + .into(), + ), + false, // always non-null + )) + } +} + +impl TryFrom<&DataType> for ArrowDataType { + type Error = ArrowError; + + fn try_from(t: &DataType) -> Result { + match t { + DataType::Primitive(p) => { + match p { + PrimitiveType::String => Ok(ArrowDataType::Utf8), + PrimitiveType::Long => Ok(ArrowDataType::Int64), // undocumented type + PrimitiveType::Integer => Ok(ArrowDataType::Int32), + PrimitiveType::Short => Ok(ArrowDataType::Int16), + PrimitiveType::Byte => Ok(ArrowDataType::Int8), + PrimitiveType::Float => Ok(ArrowDataType::Float32), + PrimitiveType::Double => Ok(ArrowDataType::Float64), + PrimitiveType::Boolean => Ok(ArrowDataType::Boolean), + PrimitiveType::Binary => Ok(ArrowDataType::Binary), + PrimitiveType::Decimal(precision, scale) => { + let precision = u8::try_from(*precision).map_err(|_| { + ArrowError::SchemaError(format!( + "Invalid precision for decimal: {}", + precision + )) + })?; + let scale = i8::try_from(*scale).map_err(|_| { + ArrowError::SchemaError(format!("Invalid scale for decimal: {}", scale)) + })?; + + if precision <= 38 { + Ok(ArrowDataType::Decimal128(precision, scale)) + } else if precision <= 76 { + Ok(ArrowDataType::Decimal256(precision, scale)) + } else { + Err(ArrowError::SchemaError(format!( + "Precision too large to be represented in Arrow: {}", + precision + ))) + } + } + PrimitiveType::Date => { + // A calendar date, represented as a year-month-day triple without a + // timezone. Stored as 4 bytes integer representing days since 1970-01-01 + Ok(ArrowDataType::Date32) + } + PrimitiveType::Timestamp => { + // Issue: https://github.com/delta-io/delta/issues/643 + Ok(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)) + } + } + } + DataType::Struct(s) => Ok(ArrowDataType::Struct( + s.fields() + .iter() + .map(|f| >::try_from(*f)) + .collect::, ArrowError>>()? + .into(), + )), + DataType::Array(a) => Ok(ArrowDataType::List(Arc::new(>::try_from(a)?))), + DataType::Map(m) => Ok(ArrowDataType::Map( + Arc::new(ArrowField::new( + "entries", + ArrowDataType::Struct( + vec![ + ArrowField::new( + "key", + >::try_from(m.key_type())?, + false, + ), + ArrowField::new( + "value", + >::try_from(m.value_type())?, + m.value_contains_null(), + ), + ] + .into(), + ), + true, + )), + false, + )), + } + } +} + +impl TryFrom<&ArrowSchema> for StructType { + type Error = ArrowError; + + fn try_from(arrow_schema: &ArrowSchema) -> Result { + let new_fields: Result, _> = arrow_schema + .fields() + .iter() + .map(|field| field.as_ref().try_into()) + .collect(); + Ok(StructType::new(new_fields?)) + } +} + +impl TryFrom for StructType { + type Error = ArrowError; + + fn try_from(arrow_schema: ArrowSchemaRef) -> Result { + arrow_schema.as_ref().try_into() + } +} + +impl TryFrom<&ArrowField> for StructField { + type Error = ArrowError; + + fn try_from(arrow_field: &ArrowField) -> Result { + Ok(StructField::new( + arrow_field.name().clone(), + arrow_field.data_type().try_into()?, + arrow_field.is_nullable(), + ) + .with_metadata(arrow_field.metadata().iter().map(|(k, v)| (k.clone(), v)))) + } +} + +impl TryFrom<&ArrowDataType> for DataType { + type Error = ArrowError; + + fn try_from(arrow_datatype: &ArrowDataType) -> Result { + match arrow_datatype { + ArrowDataType::Utf8 => Ok(DataType::Primitive(PrimitiveType::String)), + ArrowDataType::LargeUtf8 => Ok(DataType::Primitive(PrimitiveType::String)), + ArrowDataType::Int64 => Ok(DataType::Primitive(PrimitiveType::Long)), // undocumented type + ArrowDataType::Int32 => Ok(DataType::Primitive(PrimitiveType::Integer)), + ArrowDataType::Int16 => Ok(DataType::Primitive(PrimitiveType::Short)), + ArrowDataType::Int8 => Ok(DataType::Primitive(PrimitiveType::Byte)), + ArrowDataType::UInt64 => Ok(DataType::Primitive(PrimitiveType::Long)), // undocumented type + ArrowDataType::UInt32 => Ok(DataType::Primitive(PrimitiveType::Integer)), + ArrowDataType::UInt16 => Ok(DataType::Primitive(PrimitiveType::Short)), + ArrowDataType::UInt8 => Ok(DataType::Primitive(PrimitiveType::Boolean)), + ArrowDataType::Float32 => Ok(DataType::Primitive(PrimitiveType::Float)), + ArrowDataType::Float64 => Ok(DataType::Primitive(PrimitiveType::Double)), + ArrowDataType::Boolean => Ok(DataType::Primitive(PrimitiveType::Boolean)), + ArrowDataType::Binary => Ok(DataType::Primitive(PrimitiveType::Binary)), + ArrowDataType::FixedSizeBinary(_) => Ok(DataType::Primitive(PrimitiveType::Binary)), + ArrowDataType::LargeBinary => Ok(DataType::Primitive(PrimitiveType::Binary)), + ArrowDataType::Decimal128(p, s) => Ok(DataType::Primitive(PrimitiveType::Decimal( + *p as i32, *s as i32, + ))), + ArrowDataType::Decimal256(p, s) => Ok(DataType::Primitive(PrimitiveType::Decimal( + *p as i32, *s as i32, + ))), + ArrowDataType::Date32 => Ok(DataType::Primitive(PrimitiveType::Date)), + ArrowDataType::Date64 => Ok(DataType::Primitive(PrimitiveType::Date)), + ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => { + Ok(DataType::Primitive(PrimitiveType::Timestamp)) + } + ArrowDataType::Timestamp(TimeUnit::Microsecond, Some(tz)) + if tz.eq_ignore_ascii_case("utc") => + { + Ok(DataType::Primitive(PrimitiveType::Timestamp)) + } + ArrowDataType::Struct(fields) => { + let converted_fields: Result, _> = fields + .iter() + .map(|field| field.as_ref().try_into()) + .collect(); + Ok(DataType::Struct(Box::new(StructType::new( + converted_fields?, + )))) + } + ArrowDataType::List(field) => Ok(DataType::Array(Box::new(ArrayType::new( + (*field).data_type().try_into()?, + (*field).is_nullable(), + )))), + ArrowDataType::LargeList(field) => Ok(DataType::Array(Box::new(ArrayType::new( + (*field).data_type().try_into()?, + (*field).is_nullable(), + )))), + ArrowDataType::FixedSizeList(field, _) => Ok(DataType::Array(Box::new( + ArrayType::new((*field).data_type().try_into()?, (*field).is_nullable()), + ))), + ArrowDataType::Map(field, _) => { + if let ArrowDataType::Struct(struct_fields) = field.data_type() { + let key_type = struct_fields[0].data_type().try_into()?; + let value_type = struct_fields[1].data_type().try_into()?; + let value_type_nullable = struct_fields[1].is_nullable(); + Ok(DataType::Map(Box::new(MapType::new( + key_type, + value_type, + value_type_nullable, + )))) + } else { + panic!("DataType::Map should contain a struct field child"); + } + } + s => Err(ArrowError::SchemaError(format!( + "Invalid data type for Delta Lake: {s}" + ))), + } + } +} diff --git a/rust/src/kernel/actions/mod.rs b/rust/src/kernel/actions/mod.rs new file mode 100644 index 0000000000..dd00b82cfd --- /dev/null +++ b/rust/src/kernel/actions/mod.rs @@ -0,0 +1,620 @@ +use std::collections::HashMap; + +use arrow_array::{ + BooleanArray, Int32Array, Int64Array, ListArray, MapArray, RecordBatch, StringArray, + StructArray, +}; +use either::Either; +use fix_hidden_lifetime_bug::fix_hidden_lifetime_bug; +use itertools::izip; + +use super::super::{DeltaResult, Error}; + +pub(crate) mod arrow; +pub(crate) mod schemas; +pub(crate) mod types; + +pub use schemas::get_log_schema; +pub use types::*; + +#[derive(Debug)] +pub enum ActionType { + /// modify the data in a table by adding individual logical files + Add, + /// add a file containing only the data that was changed as part of the transaction + Cdc, + /// additional provenance information about what higher-level operation was being performed + CommitInfo, + /// contains a configuration (string-string map) for a named metadata domain + DomainMetadata, + /// changes the current metadata of the table + Metadata, + /// increase the version of the Delta protocol that is required to read or write a given table + Protocol, + /// modify the data in a table by removing individual logical files + Remove, + /// The Row ID high-water mark tracks the largest ID that has been assigned to a row in the table. + RowIdHighWaterMark, + Txn, +} + +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum Action { + Metadata(Metadata), + Protocol(Protocol), + Add(Add), + Remove(Remove), +} + +#[fix_hidden_lifetime_bug] +pub(crate) fn parse_actions<'a>( + batch: &RecordBatch, + types: impl IntoIterator, +) -> DeltaResult> { + Ok(types + .into_iter() + .filter_map(|action| parse_action(batch, action).ok()) + .flatten()) +} + +#[fix_hidden_lifetime_bug] +pub(crate) fn parse_action( + batch: &RecordBatch, + action_type: &ActionType, +) -> DeltaResult> { + let column_name = match action_type { + ActionType::Metadata => "metaData", + ActionType::Protocol => "protocol", + ActionType::Add => "add", + ActionType::Remove => "remove", + _ => unimplemented!(), + }; + + let arr = batch + .column_by_name(column_name) + .ok_or(Error::MissingColumn(column_name.into()))? + .as_any() + .downcast_ref::() + .ok_or(Error::UnexpectedColumnType( + "Cannot downcast to StructArray".into(), + ))?; + + match action_type { + ActionType::Metadata => parse_action_metadata(arr), + ActionType::Protocol => parse_action_protocol(arr), + ActionType::Add => parse_actions_add(arr), + ActionType::Remove => parse_actions_remove(arr), + _ => todo!(), + } +} + +fn parse_action_metadata(arr: &StructArray) -> DeltaResult>> { + let ids = cast_struct_column::(arr, "id")?; + let schema_strings = cast_struct_column::(arr, "schemaString")?; + let metadata = ids + .into_iter() + .zip(schema_strings) + .filter_map(|(maybe_id, maybe_schema_string)| { + if let (Some(id), Some(schema_string)) = (maybe_id, maybe_schema_string) { + Some(Metadata::new( + id, + Format { + provider: "parquet".into(), + options: Default::default(), + }, + schema_string, + Vec::::new(), + None, + )) + } else { + None + } + }) + .next(); + + if metadata.is_none() { + return Ok(Box::new(std::iter::empty())); + } + let mut metadata = metadata.unwrap(); + + metadata.partition_columns = cast_struct_column::(arr, "partitionColumns") + .ok() + .map(|arr| { + arr.iter() + .filter_map(|it| { + if let Some(features) = it { + let vals = features + .as_any() + .downcast_ref::()? + .iter() + .filter_map(|v| v.map(|inner| inner.to_owned())) + .collect::>(); + Some(vals) + } else { + None + } + }) + .flatten() + .collect::>() + }) + .unwrap_or_default(); + + metadata.name = cast_struct_column::(arr, "name") + .ok() + .and_then(|arr| { + arr.iter() + .flat_map(|maybe| maybe.map(|v| v.to_string())) + .next() + }); + metadata.description = cast_struct_column::(arr, "description") + .ok() + .and_then(|arr| { + arr.iter() + .flat_map(|maybe| maybe.map(|v| v.to_string())) + .next() + }); + metadata.created_time = cast_struct_column::(arr, "createdTime") + .ok() + .and_then(|arr| arr.iter().flatten().next()); + + if let Ok(config) = cast_struct_column::(arr, "configuration") { + let keys = config + .keys() + .as_any() + .downcast_ref::() + .ok_or(Error::MissingData("expected key column in map".into()))?; + let values = config + .values() + .as_any() + .downcast_ref::() + .ok_or(Error::MissingData("expected value column in map".into()))?; + metadata.configuration = keys + .into_iter() + .zip(values) + .filter_map(|(k, v)| k.map(|key| (key.to_string(), v.map(|vv| vv.to_string())))) + .collect::>(); + }; + + Ok(Box::new(std::iter::once(Action::Metadata(metadata)))) +} + +fn parse_action_protocol(arr: &StructArray) -> DeltaResult>> { + let min_reader = cast_struct_column::(arr, "minReaderVersion")?; + let min_writer = cast_struct_column::(arr, "minWriterVersion")?; + let protocol = min_reader + .into_iter() + .zip(min_writer) + .filter_map(|(r, w)| { + if let (Some(min_reader_version), Some(min_wrriter_version)) = (r, w) { + Some(Protocol::new(min_reader_version, min_wrriter_version)) + } else { + None + } + }) + .next(); + + if protocol.is_none() { + return Ok(Box::new(std::iter::empty())); + } + let mut protocol = protocol.unwrap(); + + protocol.reader_features = cast_struct_column::(arr, "readerFeatures") + .ok() + .map(|arr| { + arr.iter() + .filter_map(|it| { + if let Some(features) = it { + let vals = features + .as_any() + .downcast_ref::()? + .iter() + .filter_map(|v| v.map(|inner| inner.to_owned())) + .collect::>(); + Some(vals) + } else { + None + } + }) + .flatten() + .collect::>() + }); + + protocol.writer_features = cast_struct_column::(arr, "writerFeatures") + .ok() + .map(|arr| { + arr.iter() + .filter_map(|it| { + if let Some(features) = it { + let vals = features + .as_any() + .downcast_ref::()? + .iter() + .filter_map(|v| v.map(|inner| inner.to_string())) + .collect::>(); + Some(vals) + } else { + None + } + }) + .flatten() + .collect::>() + }); + + Ok(Box::new(std::iter::once(Action::Protocol(protocol)))) +} + +fn parse_actions_add(arr: &StructArray) -> DeltaResult + '_>> { + let paths = cast_struct_column::(arr, "path")?; + let sizes = cast_struct_column::(arr, "size")?; + let modification_times = cast_struct_column::(arr, "modificationTime")?; + let data_changes = cast_struct_column::(arr, "dataChange")?; + let partition_values = cast_struct_column::(arr, "partitionValues")? + .iter() + .map(|data| data.map(|d| struct_array_to_map(&d).unwrap())); + + let tags = if let Ok(stats) = cast_struct_column::(arr, "tags") { + Either::Left( + stats + .iter() + .map(|data| data.map(|d| struct_array_to_map(&d).unwrap())), + ) + } else { + Either::Right(std::iter::repeat(None).take(sizes.len())) + }; + + let stats = if let Ok(stats) = cast_struct_column::(arr, "stats") { + Either::Left(stats.into_iter()) + } else { + Either::Right(std::iter::repeat(None).take(sizes.len())) + }; + + let base_row_ids = if let Ok(row_ids) = cast_struct_column::(arr, "baseRowId") { + Either::Left(row_ids.into_iter()) + } else { + Either::Right(std::iter::repeat(None).take(sizes.len())) + }; + + let commit_versions = + if let Ok(versions) = cast_struct_column::(arr, "defaultRowCommitVersion") { + Either::Left(versions.into_iter()) + } else { + Either::Right(std::iter::repeat(None).take(sizes.len())) + }; + + let deletion_vectors = if let Ok(dvs) = cast_struct_column::(arr, "deletionVector") + { + Either::Left(parse_dv(dvs)?) + } else { + Either::Right(std::iter::repeat(None).take(sizes.len())) + }; + + let zipped = izip!( + paths, + sizes, + modification_times, + data_changes, + partition_values, + stats, + tags, + base_row_ids, + commit_versions, + deletion_vectors, + ); + let zipped = zipped.map( + |( + maybe_paths, + maybe_size, + maybe_modification_time, + maybe_data_change, + partition_values, + stat, + tags, + base_row_id, + default_row_commit_version, + deletion_vector, + )| { + if let (Some(path), Some(size), Some(modification_time), Some(data_change)) = ( + maybe_paths, + maybe_size, + maybe_modification_time, + maybe_data_change, + ) { + Some(Add { + path: path.into(), + size, + modification_time, + data_change, + partition_values: partition_values.unwrap_or_default(), + stats: stat.map(|v| v.to_string()), + tags: tags.unwrap_or_default(), + base_row_id, + default_row_commit_version, + deletion_vector, + }) + } else { + None + } + }, + ); + + Ok(Box::new(zipped.flatten().map(Action::Add))) +} + +fn parse_actions_remove(arr: &StructArray) -> DeltaResult + '_>> { + let paths = cast_struct_column::(arr, "path")?; + let data_changes = cast_struct_column::(arr, "dataChange")?; + + let deletion_timestamps = + if let Ok(ts) = cast_struct_column::(arr, "deletionTimestamp") { + Either::Left(ts.into_iter()) + } else { + Either::Right(std::iter::repeat(None).take(data_changes.len())) + }; + + let extended_file_metadata = + if let Ok(metas) = cast_struct_column::(arr, "extendedFileMetadata") { + Either::Left(metas.into_iter()) + } else { + Either::Right(std::iter::repeat(None).take(data_changes.len())) + }; + + let partition_values = + if let Ok(values) = cast_struct_column::(arr, "partitionValues") { + Either::Left( + values + .iter() + .map(|data| data.map(|d| struct_array_to_map(&d).unwrap())), + ) + } else { + Either::Right(std::iter::repeat(None).take(data_changes.len())) + }; + + let sizes = if let Ok(size) = cast_struct_column::(arr, "size") { + Either::Left(size.into_iter()) + } else { + Either::Right(std::iter::repeat(None).take(data_changes.len())) + }; + + let tags = if let Ok(tags) = cast_struct_column::(arr, "tags") { + Either::Left( + tags.iter() + .map(|data| data.map(|d| struct_array_to_map(&d).unwrap())), + ) + } else { + Either::Right(std::iter::repeat(None).take(data_changes.len())) + }; + + let deletion_vectors = if let Ok(dvs) = cast_struct_column::(arr, "deletionVector") + { + Either::Left(parse_dv(dvs)?) + } else { + Either::Right(std::iter::repeat(None).take(data_changes.len())) + }; + + let base_row_ids = if let Ok(row_ids) = cast_struct_column::(arr, "baseRowId") { + Either::Left(row_ids.into_iter()) + } else { + Either::Right(std::iter::repeat(None).take(data_changes.len())) + }; + + let commit_versions = + if let Ok(row_ids) = cast_struct_column::(arr, "defaultRowCommitVersion") { + Either::Left(row_ids.into_iter()) + } else { + Either::Right(std::iter::repeat(None).take(data_changes.len())) + }; + + let zipped = izip!( + paths, + data_changes, + deletion_timestamps, + extended_file_metadata, + partition_values, + sizes, + tags, + deletion_vectors, + base_row_ids, + commit_versions, + ); + + let zipped = zipped.map( + |( + maybe_paths, + maybe_data_change, + deletion_timestamp, + extended_file_metadata, + partition_values, + size, + tags, + deletion_vector, + base_row_id, + default_row_commit_version, + )| { + if let (Some(path), Some(data_change)) = (maybe_paths, maybe_data_change) { + Some(Remove { + path: path.into(), + data_change, + deletion_timestamp, + extended_file_metadata, + partition_values, + size, + tags, + deletion_vector, + base_row_id, + default_row_commit_version, + }) + } else { + None + } + }, + ); + + Ok(Box::new(zipped.flatten().map(Action::Remove))) +} + +fn parse_dv( + arr: &StructArray, +) -> DeltaResult> + '_> { + let storage_types = cast_struct_column::(arr, "storageType")?; + let paths_or_inlines = cast_struct_column::(arr, "pathOrInlineDv")?; + let sizes_in_bytes = cast_struct_column::(arr, "sizeInBytes")?; + let cardinalities = cast_struct_column::(arr, "cardinality")?; + + let offsets = if let Ok(offsets) = cast_struct_column::(arr, "offset") { + Either::Left(offsets.into_iter()) + } else { + Either::Right(std::iter::repeat(None).take(cardinalities.len())) + }; + + let zipped = izip!( + storage_types, + paths_or_inlines, + sizes_in_bytes, + cardinalities, + offsets, + ); + + Ok(zipped.map( + |(maybe_type, maybe_path_or_inline_dv, maybe_size_in_bytes, maybe_cardinality, offset)| { + if let ( + Some(storage_type), + Some(path_or_inline_dv), + Some(size_in_bytes), + Some(cardinality), + ) = ( + maybe_type, + maybe_path_or_inline_dv, + maybe_size_in_bytes, + maybe_cardinality, + ) { + Some(DeletionVectorDescriptor { + storage_type: storage_type.into(), + path_or_inline_dv: path_or_inline_dv.into(), + size_in_bytes, + cardinality, + offset, + }) + } else { + None + } + }, + )) +} + +fn cast_struct_column(arr: &StructArray, name: impl AsRef) -> DeltaResult<&T> { + arr.column_by_name(name.as_ref()) + .ok_or(Error::MissingColumn(name.as_ref().into()))? + .as_any() + .downcast_ref::() + .ok_or(Error::UnexpectedColumnType( + "Cannot downcast to expected type".into(), + )) +} + +fn struct_array_to_map(arr: &StructArray) -> DeltaResult>> { + let keys = cast_struct_column::(arr, "key")?; + let values = cast_struct_column::(arr, "value")?; + Ok(keys + .into_iter() + .zip(values) + .filter_map(|(k, v)| k.map(|key| (key.to_string(), v.map(|vv| vv.to_string())))) + .collect()) +} + +#[cfg(all(test, feature = "default-client"))] +mod tests { + use std::sync::Arc; + + use object_store::local::LocalFileSystem; + + use super::*; + use crate::actions::Protocol; + use crate::client::json::DefaultJsonHandler; + use crate::executor::tokio::TokioBackgroundExecutor; + use crate::JsonHandler; + + fn action_batch() -> RecordBatch { + let store = Arc::new(LocalFileSystem::new()); + let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); + + let json_strings: StringArray = vec![ + r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#, + r#"{"commitInfo":{"timestamp":1677811178585,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"10","numOutputBytes":"635"},"engineInfo":"Databricks-Runtime/","txnId":"a6a94671-55ef-450e-9546-b8465b9147de"}}"#, + r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#, + r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#, + ] + .into(); + let output_schema = Arc::new(get_log_schema()); + handler.parse_json(json_strings, output_schema).unwrap() + } + + #[test] + fn test_parse_protocol() { + let batch = action_batch(); + let action = parse_action(&batch, &ActionType::Protocol) + .unwrap() + .collect::>(); + let expected = Action::Protocol(Protocol { + min_reader_version: 3, + min_writer_version: 7, + reader_features: Some(vec!["deletionVectors".into()]), + writer_features: Some(vec!["deletionVectors".into()]), + }); + assert_eq!(action[0], expected) + } + + #[test] + fn test_parse_metadata() { + let batch = action_batch(); + let action = parse_action(&batch, &ActionType::Metadata) + .unwrap() + .collect::>(); + let configuration = HashMap::from_iter([ + ( + "delta.enableDeletionVectors".to_string(), + Some("true".to_string()), + ), + ( + "delta.columnMapping.mode".to_string(), + Some("none".to_string()), + ), + ]); + let expected = Action::Metadata(Metadata { + id: "testId".into(), + name: None, + description: None, + format: Format { + provider: "parquet".into(), + options: Default::default(), + }, + schema_string: r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#.to_string(), + partition_columns: Vec::new(), + created_time: Some(1677811175819), + configuration, + }); + assert_eq!(action[0], expected) + } + + #[test] + fn test_parse_add_partitioned() { + let store = Arc::new(LocalFileSystem::new()); + let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); + + let json_strings: StringArray = vec![ + r#"{"commitInfo":{"timestamp":1670892998177,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"c1\",\"c2\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"3","numOutputBytes":"1356"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.2.0","txnId":"046a258f-45e3-4657-b0bf-abfb0f76681c"}}"#, + r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#, + r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, + r#"{"add":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","partitionValues":{"c1":"4","c2":"c"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#, + r#"{"add":{"path":"c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet","partitionValues":{"c1":"5","c2":"b"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":6},\"maxValues\":{\"c3\":6},\"nullCount\":{\"c3\":0}}"}}"#, + r#"{"add":{"path":"c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}"}}"#, + ] + .into(); + let output_schema = Arc::new(get_log_schema()); + let batch = handler.parse_json(json_strings, output_schema).unwrap(); + + let actions = parse_action(&batch, &ActionType::Add) + .unwrap() + .collect::>(); + println!("{:?}", actions) + } +} diff --git a/rust/src/kernel/actions/schemas.rs b/rust/src/kernel/actions/schemas.rs new file mode 100644 index 0000000000..f93959ad64 --- /dev/null +++ b/rust/src/kernel/actions/schemas.rs @@ -0,0 +1,252 @@ +use std::sync::Arc; + +use arrow_schema::{DataType, Field, Fields, Schema}; + +use super::ActionType; + +impl ActionType { + pub fn field(&self) -> Field { + match self { + Self::Add => get_root("add", self.fields()), + Self::Cdc => get_root("cdc", self.fields()), + Self::CommitInfo => get_root("commitInfo", self.fields()), + Self::DomainMetadata => get_root("domainMetadata", self.fields()), + Self::Metadata => get_root("metaData", self.fields()), + Self::Protocol => get_root("protocol", self.fields()), + Self::Remove => get_root("remove", self.fields()), + Self::RowIdHighWaterMark => get_root("rowIdHighWaterMark", self.fields()), + Self::Txn => get_root("txn", self.fields()), + } + } + + pub fn fields(&self) -> Vec { + match self { + Self::Add => add_fields(), + Self::Cdc => cdc_fields(), + Self::CommitInfo => commit_info_fields(), + Self::DomainMetadata => domain_metadata_fields(), + Self::Metadata => metadata_fields(), + Self::Protocol => protocol_fields(), + Self::Remove => remove_fields(), + Self::RowIdHighWaterMark => watermark_fields(), + Self::Txn => txn_fields(), + } + } +} + +pub fn get_log_schema() -> Schema { + Schema { + fields: Fields::from_iter([ + ActionType::Add.field(), + ActionType::Cdc.field(), + ActionType::CommitInfo.field(), + ActionType::DomainMetadata.field(), + ActionType::Metadata.field(), + ActionType::Protocol.field(), + ActionType::Remove.field(), + ActionType::RowIdHighWaterMark.field(), + ActionType::Txn.field(), + ]), + metadata: Default::default(), + } +} + +fn get_root(name: &str, fields: Vec) -> Field { + Field::new(name, DataType::Struct(Fields::from_iter(fields)), true) +} + +fn add_fields() -> Vec { + Vec::from_iter([ + Field::new("path", DataType::Utf8, false), + Field::new("size", DataType::Int64, false), + Field::new("modificationTime", DataType::Int64, false), + Field::new("dataChange", DataType::Boolean, false), + Field::new("stats", DataType::Utf8, true), + Field::new( + "partitionValues", + DataType::Map(Arc::new(get_map_field()), false), + true, + ), + Field::new( + "tags", + DataType::Map(Arc::new(get_map_field()), false), + true, + ), + Field::new( + "deletionVector", + DataType::Struct(Fields::from(vec![ + Field::new("storageType", DataType::Utf8, false), + Field::new("pathOrInlineDv", DataType::Utf8, false), + Field::new("offset", DataType::Int32, true), + Field::new("sizeInBytes", DataType::Int32, false), + Field::new("cardinality", DataType::Int64, false), + ])), + true, + ), + Field::new("baseRowId", DataType::Int64, true), + Field::new("defaultRowCommitVersion", DataType::Int64, true), + ]) +} + +fn cdc_fields() -> Vec { + Vec::from_iter([ + Field::new("path", DataType::Utf8, true), + Field::new( + "partitionValues", + DataType::Map(Arc::new(get_map_field()), false), + true, + ), + Field::new("size", DataType::Int64, true), + Field::new("dataChange", DataType::Boolean, true), + Field::new( + "tags", + DataType::Map(Arc::new(get_map_field()), false), + true, + ), + ]) +} + +fn remove_fields() -> Vec { + Vec::from_iter([ + Field::new("path", DataType::Utf8, true), + Field::new("deletionTimestamp", DataType::Int64, true), + Field::new("dataChange", DataType::Boolean, true), + Field::new("extendedFileMetadata", DataType::Boolean, true), + Field::new("size", DataType::Int64, true), + Field::new( + "partitionValues", + DataType::Map(Arc::new(get_map_field()), false), + true, + ), + Field::new( + "tags", + DataType::Map(Arc::new(get_map_field()), false), + true, + ), + ]) +} + +fn metadata_fields() -> Vec { + Vec::from_iter([ + Field::new("id", DataType::Utf8, false), + Field::new("name", DataType::Utf8, true), + Field::new("description", DataType::Utf8, true), + Field::new( + "format", + DataType::Struct(Fields::from_iter([ + Field::new("provider", DataType::Utf8, true), + Field::new( + "options", + DataType::Map( + Arc::new(Field::new( + "key_value", + DataType::Struct(Fields::from_iter([ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Utf8, true), + ])), + false, + )), + false, + ), + false, + ), + ])), + false, + ), + Field::new("schemaString", DataType::Utf8, false), + Field::new("createdTime", DataType::Int64, true), + Field::new( + "partitionColumns", + DataType::List(Arc::new(Field::new("element", DataType::Utf8, false))), + false, + ), + Field::new( + "configuration", + DataType::Map( + Arc::new(Field::new( + "key_value", + DataType::Struct(Fields::from_iter([ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Utf8, true), + ])), + false, + )), + false, + ), + true, + ), + ]) +} + +fn protocol_fields() -> Vec { + Vec::from_iter([ + Field::new("minReaderVersion", DataType::Int32, false), + Field::new("minWriterVersion", DataType::Int32, false), + Field::new( + "readerFeatures", + DataType::List(Arc::new(Field::new("element", DataType::Utf8, false))), + true, + ), + Field::new( + "writerFeatures", + DataType::List(Arc::new(Field::new("element", DataType::Utf8, false))), + true, + ), + ]) +} + +fn txn_fields() -> Vec { + Vec::from_iter([ + Field::new("appId", DataType::Utf8, true), + Field::new("version", DataType::Int64, true), + Field::new("lastUpdated", DataType::Int64, true), + ]) +} + +fn watermark_fields() -> Vec { + Vec::from_iter([Field::new("highWaterMark", DataType::Int64, true)]) +} + +fn commit_info_fields() -> Vec { + Vec::from_iter([ + Field::new("timestamp", DataType::Int64, true), + Field::new("operation", DataType::Utf8, true), + Field::new("isolationLevel", DataType::Utf8, true), + Field::new("isBlindAppend", DataType::Boolean, true), + Field::new("txnId", DataType::Utf8, true), + Field::new("readVersion", DataType::Int32, true), + Field::new( + "operationParameters", + DataType::Map(Arc::new(get_map_field()), false), + true, + ), + Field::new( + "operationMetrics", + DataType::Map(Arc::new(get_map_field()), false), + true, + ), + ]) +} + +fn domain_metadata_fields() -> Vec { + Vec::from_iter([ + Field::new("domain", DataType::Utf8, true), + Field::new( + "configuration", + DataType::Map(Arc::new(get_map_field()), false), + true, + ), + Field::new("removed", DataType::Boolean, true), + ]) +} + +fn get_map_field() -> Field { + Field::new( + "key_value", + DataType::Struct(Fields::from_iter([ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Utf8, true), + ])), + false, + ) +} diff --git a/rust/src/kernel/actions/types.rs b/rust/src/kernel/actions/types.rs new file mode 100644 index 0000000000..ed471f1e4c --- /dev/null +++ b/rust/src/kernel/actions/types.rs @@ -0,0 +1,462 @@ +use std::collections::HashMap; +use std::io::{Cursor, Read}; +use std::sync::Arc; + +use roaring::RoaringTreemap; +use url::Url; + +use super::super::schema::StructType; +use crate::{DeltaResult, Error, FileSystemClient}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Format { + /// Name of the encoding for files in this table + pub provider: String, + /// A map containing configuration options for the format + pub options: HashMap, +} + +impl Default for Format { + fn default() -> Self { + Self { + provider: String::from("parquet"), + options: HashMap::new(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Metadata { + /// Unique identifier for this table + pub id: String, + /// User-provided identifier for this table + pub name: Option, + /// User-provided description for this table + pub description: Option, + /// Specification of the encoding for the files stored in the table + pub format: Format, + /// Schema of the table + pub schema_string: String, + /// Column names by which the data should be partitioned + pub partition_columns: Vec, + /// The time when this metadata action is created, in milliseconds since the Unix epoch + pub created_time: Option, + /// Configuration options for the metadata action + pub configuration: HashMap>, +} + +impl Metadata { + pub fn new( + id: impl Into, + format: Format, + schema_string: impl Into, + partition_columns: impl IntoIterator>, + configuration: Option>>, + ) -> Self { + Self { + id: id.into(), + format, + schema_string: schema_string.into(), + partition_columns: partition_columns.into_iter().map(|c| c.into()).collect(), + configuration: configuration.unwrap_or_default(), + name: None, + description: None, + created_time: None, + } + } + + pub fn with_name(mut self, name: impl Into) -> Self { + self.name = Some(name.into()); + self + } + + pub fn with_description(mut self, description: impl Into) -> Self { + self.description = Some(description.into()); + self + } + + pub fn with_created_time(mut self, created_time: i64) -> Self { + self.created_time = Some(created_time); + self + } + + pub fn schema(&self) -> DeltaResult { + Ok(serde_json::from_str(&self.schema_string)?) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Protocol { + /// The minimum version of the Delta read protocol that a client must implement + /// in order to correctly read this table + pub min_reader_version: i32, + /// The minimum version of the Delta write protocol that a client must implement + /// in order to correctly write this table + pub min_writer_version: i32, + /// A collection of features that a client must implement in order to correctly + /// read this table (exist only when minReaderVersion is set to 3) + pub reader_features: Option>, + /// A collection of features that a client must implement in order to correctly + /// write this table (exist only when minWriterVersion is set to 7) + pub writer_features: Option>, +} + +impl Protocol { + pub fn new(min_reader_version: i32, min_wrriter_version: i32) -> Self { + Self { + min_reader_version, + min_writer_version: min_wrriter_version, + reader_features: None, + writer_features: None, + } + } + + pub fn with_reader_features( + mut self, + reader_features: impl IntoIterator>, + ) -> Self { + self.reader_features = Some(reader_features.into_iter().map(|c| c.into()).collect()); + self + } + + pub fn with_writer_features( + mut self, + writer_features: impl IntoIterator>, + ) -> Self { + self.writer_features = Some(writer_features.into_iter().map(|c| c.into()).collect()); + self + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DeletionVectorDescriptor { + /// A single character to indicate how to access the DV. Legal options are: ['u', 'i', 'p']. + pub storage_type: String, + + /// Three format options are currently proposed: + /// - If `storageType = 'u'` then ``: + /// The deletion vector is stored in a file with a path relative to the data + /// directory of this Delta table, and the file name can be reconstructed from + /// the UUID. See Derived Fields for how to reconstruct the file name. The random + /// prefix is recovered as the extra characters before the (20 characters fixed length) uuid. + /// - If `storageType = 'i'` then ``: The deletion vector + /// is stored inline in the log. The format used is the `RoaringBitmapArray` + /// format also used when the DV is stored on disk and described in [Deletion Vector Format]. + /// - If `storageType = 'p'` then ``: The DV is stored in a file with an + /// absolute path given by this path, which has the same format as the `path` field + /// in the `add`/`remove` actions. + /// + /// [Deletion Vector Format]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Deletion-Vector-Format + pub path_or_inline_dv: String, + + /// Start of the data for this DV in number of bytes from the beginning of the file it is stored in. + /// Always None (absent in JSON) when `storageType = 'i'`. + pub offset: Option, + + /// Size of the serialized DV in bytes (raw data size, i.e. before base85 encoding, if inline). + pub size_in_bytes: i32, + + /// Number of rows the given DV logically removes from the file. + pub cardinality: i64, +} + +impl DeletionVectorDescriptor { + pub fn unique_id(&self) -> String { + if let Some(offset) = self.offset { + format!("{}{}@{offset}", self.storage_type, self.path_or_inline_dv) + } else { + format!("{}{}", self.storage_type, self.path_or_inline_dv) + } + } + + pub fn absolute_path(&self, parent: &Url) -> DeltaResult> { + match self.storage_type.as_str() { + "u" => { + let prefix_len = self.path_or_inline_dv.len() as i32 - 20; + if prefix_len < 0 { + return Err(Error::DeletionVector("Invalid length".to_string())); + } + let decoded = z85::decode(&self.path_or_inline_dv[(prefix_len as usize)..]) + .map_err(|_| Error::DeletionVector("Failed to decode DV uuid".to_string()))?; + let uuid = uuid::Uuid::from_slice(&decoded) + .map_err(|err| Error::DeletionVector(err.to_string()))?; + let mut dv_suffix = format!("deletion_vector_{uuid}.bin"); + if prefix_len > 0 { + dv_suffix = format!( + "{}/{}", + &self.path_or_inline_dv[..(prefix_len as usize)], + dv_suffix + ); + } + let dv_path = parent + .join(&dv_suffix) + .map_err(|_| Error::DeletionVector(format!("invalid path: {}", dv_suffix)))?; + Ok(Some(dv_path)) + } + "p" => Ok(Some(Url::parse(&self.path_or_inline_dv).map_err(|_| { + Error::DeletionVector(format!("invalid path: {}", self.path_or_inline_dv)) + })?)), + "i" => Ok(None), + other => Err(Error::DeletionVector(format!( + "Unknown storage format: '{other}'." + ))), + } + } + + // TODO read only required byte ranges + pub fn read( + &self, + fs_client: Arc, + parent: Url, + ) -> DeltaResult { + match self.absolute_path(&parent)? { + None => { + let bytes = z85::decode(&self.path_or_inline_dv) + .map_err(|_| Error::DeletionVector("Failed to decode DV".to_string()))?; + RoaringTreemap::deserialize_from(&bytes[12..]) + .map_err(|err| Error::DeletionVector(err.to_string())) + } + Some(path) => { + let offset = self.offset; + let size_in_bytes = self.size_in_bytes; + + println!("path --> : {}", path); + println!("offset --> : {:?}", offset); + println!("size_in_bytes --> : {}", size_in_bytes); + + let dv_data = fs_client + .read_files(vec![(path, None)])? + .next() + .ok_or(Error::MissingData("No deletion Vector data".to_string()))??; + + let mut cursor = Cursor::new(dv_data); + if let Some(offset) = offset { + // TODO should we read the datasize from the DV file? + // offset plus datasize bytes + cursor.set_position((offset + 4) as u64); + } + + let mut buf = vec![0; 4]; + cursor + .read(&mut buf) + .map_err(|err| Error::DeletionVector(err.to_string()))?; + let magic = + i32::from_le_bytes(buf.try_into().map_err(|_| { + Error::DeletionVector("filed to read magic bytes".to_string()) + })?); + println!("magic --> : {}", magic); + // assert!(magic == 1681511377); + + let mut buf = vec![0; size_in_bytes as usize]; + cursor + .read(&mut buf) + .map_err(|err| Error::DeletionVector(err.to_string()))?; + + RoaringTreemap::deserialize_from(Cursor::new(buf)) + .map_err(|err| Error::DeletionVector(err.to_string())) + } + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Add { + /// A relative path to a data file from the root of the table or an absolute path to a file + /// that should be added to the table. The path is a URI as specified by + /// [RFC 2396 URI Generic Syntax], which needs to be decoded to get the data file path. + /// + /// [RFC 2396 URI Generic Syntax]: https://www.ietf.org/rfc/rfc2396.txt + pub path: String, + + /// A map from partition column to value for this logical file. + pub partition_values: HashMap>, + + /// The size of this data file in bytes + pub size: i64, + + /// The time this logical file was created, as milliseconds since the epoch. + pub modification_time: i64, + + /// When `false` the logical file must already be present in the table or the records + /// in the added file must be contained in one or more remove actions in the same version. + pub data_change: bool, + + /// Contains [statistics] (e.g., count, min/max values for columns) about the data in this logical file. + /// + /// [statistics]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Per-file-Statistics + pub stats: Option, + + /// Map containing metadata about this logical file. + pub tags: HashMap>, + + /// Information about deletion vector (DV) associated with this add action + pub deletion_vector: Option, + + /// Default generated Row ID of the first row in the file. The default generated Row IDs + /// of the other rows in the file can be reconstructed by adding the physical index of the + /// row within the file to the base Row ID + pub base_row_id: Option, + + /// First commit version in which an add action with the same path was committed to the table. + pub default_row_commit_version: Option, +} + +impl Add { + pub fn dv_unique_id(&self) -> Option { + self.deletion_vector.clone().map(|dv| dv.unique_id()) + } + + pub fn with_base_row_id(mut self, base_row_id: i64) -> Self { + self.base_row_id = Some(base_row_id); + self + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Remove { + /// A relative path to a data file from the root of the table or an absolute path to a file + /// that should be added to the table. The path is a URI as specified by + /// [RFC 2396 URI Generic Syntax], which needs to be decoded to get the data file path. + /// + /// [RFC 2396 URI Generic Syntax]: https://www.ietf.org/rfc/rfc2396.txt + pub path: String, + + /// When `false` the logical file must already be present in the table or the records + /// in the added file must be contained in one or more remove actions in the same version. + pub data_change: bool, + + /// The time this logical file was created, as milliseconds since the epoch. + pub deletion_timestamp: Option, + + /// When true the fields `partition_values`, `size`, and `tags` are present + pub extended_file_metadata: Option, + + /// A map from partition column to value for this logical file. + pub partition_values: Option>>, + + /// The size of this data file in bytes + pub size: Option, + + /// Map containing metadata about this logical file. + pub tags: Option>>, + + /// Information about deletion vector (DV) associated with this add action + pub deletion_vector: Option, + + /// Default generated Row ID of the first row in the file. The default generated Row IDs + /// of the other rows in the file can be reconstructed by adding the physical index of the + /// row within the file to the base Row ID + pub base_row_id: Option, + + /// First commit version in which an add action with the same path was committed to the table. + pub default_row_commit_version: Option, +} + +impl Remove { + pub fn dv_unique_id(&self) -> Option { + self.deletion_vector.clone().map(|dv| dv.unique_id()) + } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + use std::sync::Arc; + + use object_store::local::LocalFileSystem; + + use super::*; + use crate::client::filesystem::ObjectStoreFileSystemClient; + use crate::executor::tokio::TokioBackgroundExecutor; + + fn dv_relateive() -> DeletionVectorDescriptor { + DeletionVectorDescriptor { + storage_type: "u".to_string(), + path_or_inline_dv: "ab^-aqEH.-t@S}K{vb[*k^".to_string(), + offset: Some(4), + size_in_bytes: 40, + cardinality: 6, + } + } + + fn dv_absolute() -> DeletionVectorDescriptor { + DeletionVectorDescriptor { + storage_type: "p".to_string(), + path_or_inline_dv: + "s3://mytable/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin".to_string(), + offset: Some(4), + size_in_bytes: 40, + cardinality: 6, + } + } + + fn dv_inline() -> DeletionVectorDescriptor { + DeletionVectorDescriptor { + storage_type: "i".to_string(), + path_or_inline_dv: "wi5b=000010000siXQKl0rr91000f55c8Xg0@@D72lkbi5=-{L".to_string(), + offset: None, + size_in_bytes: 40, + cardinality: 6, + } + } + + fn dv_example() -> DeletionVectorDescriptor { + DeletionVectorDescriptor { + storage_type: "u".to_string(), + path_or_inline_dv: "vBn[lx{q8@P<9BNH/isA".to_string(), + offset: Some(1), + size_in_bytes: 36, + cardinality: 2, + } + } + + #[test] + fn test_deletion_vector_absolute_path() { + let parent = Url::parse("s3://mytable/").unwrap(); + + let relative = dv_relateive(); + let expected = + Url::parse("s3://mytable/ab/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin") + .unwrap(); + assert_eq!(expected, relative.absolute_path(&parent).unwrap().unwrap()); + + let absolute = dv_absolute(); + let expected = + Url::parse("s3://mytable/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin") + .unwrap(); + assert_eq!(expected, absolute.absolute_path(&parent).unwrap().unwrap()); + + let inline = dv_inline(); + assert_eq!(None, inline.absolute_path(&parent).unwrap()); + + let path = + std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); + let parent = url::Url::from_directory_path(path).unwrap(); + let dv_url = parent + .join("deletion_vector_61d16c75-6994-46b7-a15b-8b538852e50e.bin") + .unwrap(); + let example = dv_example(); + assert_eq!(dv_url, example.absolute_path(&parent).unwrap().unwrap()); + } + + #[test] + fn test_deletion_vector_read() { + let store = Arc::new(LocalFileSystem::new()); + let path = + std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); + let parent = url::Url::from_directory_path(path).unwrap(); + let root = object_store::path::Path::from(parent.path()); + let fs_client = Arc::new(ObjectStoreFileSystemClient::new( + store, + root, + Arc::new(TokioBackgroundExecutor::new()), + )); + + let example = dv_example(); + let tree_map = example.read(fs_client, parent).unwrap(); + + let expected: Vec = vec![0, 9]; + let found = tree_map.iter().collect::>(); + assert_eq!(found, expected) + } +} diff --git a/rust/src/kernel/error.rs b/rust/src/kernel/error.rs new file mode 100644 index 0000000000..894421276f --- /dev/null +++ b/rust/src/kernel/error.rs @@ -0,0 +1,61 @@ +pub type DeltaResult = std::result::Result; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Arrow error: {0}")] + Arrow(#[from] arrow_schema::ArrowError), + + #[error("Generic delta kernel error: {0}")] + Generic(String), + + #[error("Generic error: {source}")] + GenericError { + /// Source error + source: Box, + }, + + #[cfg(feature = "parquet")] + #[error("Arrow error: {0}")] + Parquet(#[from] parquet::errors::ParquetError), + + #[cfg(feature = "object_store")] + #[error("Error interacting with object store: {0}")] + ObjectStore(object_store::Error), + + #[error("File not found: {0}")] + FileNotFound(String), + + #[error("{0}")] + MissingColumn(String), + + #[error("Expected column type: {0}")] + UnexpectedColumnType(String), + + #[error("Expected is missing: {0}")] + MissingData(String), + + #[error("No table version found.")] + MissingVersion, + + #[error("Deletion Vector error: {0}")] + DeletionVector(String), + + #[error("Invalid url: {0}")] + InvalidUrl(#[from] url::ParseError), + + #[error("Invalid url: {0}")] + MalformedJson(#[from] serde_json::Error), + + #[error("No table metadata found in delta log.")] + MissingMetadata, +} + +#[cfg(feature = "object_store")] +impl From for Error { + fn from(value: object_store::Error) -> Self { + match value { + object_store::Error::NotFound { path, .. } => Self::FileNotFound(path), + err => Self::ObjectStore(err), + } + } +} diff --git a/rust/src/kernel/mod.rs b/rust/src/kernel/mod.rs new file mode 100644 index 0000000000..6e0eb46391 --- /dev/null +++ b/rust/src/kernel/mod.rs @@ -0,0 +1,5 @@ +pub mod actions; +pub mod error; +pub mod schema; + +pub use error::*; diff --git a/rust/src/kernel/schema.rs b/rust/src/kernel/schema.rs new file mode 100644 index 0000000000..7613f8c1a5 --- /dev/null +++ b/rust/src/kernel/schema.rs @@ -0,0 +1,554 @@ +use std::fmt::Formatter; +use std::sync::Arc; +use std::{collections::HashMap, fmt::Display}; + +use serde::{Deserialize, Serialize}; + +pub type Schema = StructType; +pub type SchemaRef = Arc; + +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +#[serde(untagged)] +pub enum MetadataValue { + Number(i32), + String(String), +} + +impl From for MetadataValue { + fn from(value: String) -> Self { + Self::String(value) + } +} + +impl From<&String> for MetadataValue { + fn from(value: &String) -> Self { + Self::String(value.clone()) + } +} + +impl From for MetadataValue { + fn from(value: i32) -> Self { + Self::Number(value) + } +} + +#[derive(Debug)] +pub enum ColumnMetadataKey { + ColumnMappingId, + ColumnMappingPhysicalName, + GenerationExpression, + IdentityStart, + IdentityStep, + IdentityHighWaterMark, + IdentityAllowExplicitInsert, + Invariants, +} + +impl AsRef for ColumnMetadataKey { + fn as_ref(&self) -> &str { + match self { + Self::ColumnMappingId => "delta.columnMapping.id", + Self::ColumnMappingPhysicalName => "delta.columnMapping.physicalName", + Self::GenerationExpression => "delta.generationExpression", + Self::IdentityAllowExplicitInsert => "delta.identity.allowExplicitInsert", + Self::IdentityHighWaterMark => "delta.identity.highWaterMark", + Self::IdentityStart => "delta.identity.start", + Self::IdentityStep => "delta.identity.step", + Self::Invariants => "delta.invariants", + } + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +pub struct StructField { + /// Name of this (possibly nested) column + pub name: String, + /// The data type of this field + #[serde(rename = "type")] + pub data_type: DataType, + /// Denotes whether this Field can be null + pub nullable: bool, + /// A JSON map containing information about this column + pub metadata: HashMap, +} + +impl StructField { + /// Creates a new field + pub fn new(name: impl Into, data_type: DataType, nullable: bool) -> Self { + Self { + name: name.into(), + data_type, + nullable, + metadata: HashMap::default(), + } + } + + pub fn with_metadata( + mut self, + metadata: impl IntoIterator, impl Into)>, + ) -> Self { + self.metadata = metadata + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect(); + self + } + + pub fn get_config_value(&self, key: &ColumnMetadataKey) -> Option<&MetadataValue> { + self.metadata.get(key.as_ref()) + } + + #[inline] + pub fn name(&self) -> &String { + &self.name + } + + #[inline] + pub fn is_nullable(&self) -> bool { + self.nullable + } + + #[inline] + pub const fn data_type(&self) -> &DataType { + &self.data_type + } + + #[inline] + pub const fn metadata(&self) -> &HashMap { + &self.metadata + } +} + +/// A struct is used to represent both the top-level schema of the table +/// as well as struct columns that contain nested columns. +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +pub struct StructType { + #[serde(rename = "type")] + pub type_name: String, + /// The type of element stored in this array + pub fields: Vec, +} + +impl StructType { + pub fn new(fields: Vec) -> Self { + Self { + type_name: "struct".into(), + fields, + } + } + + pub fn fields(&self) -> Vec<&StructField> { + self.fields.iter().collect() + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ArrayType { + #[serde(rename = "type")] + pub type_name: String, + /// The type of element stored in this array + pub element_type: DataType, + /// Denoting whether this array can contain one or more null values + pub contains_null: bool, +} + +impl ArrayType { + pub fn new(element_type: DataType, contains_null: bool) -> Self { + Self { + type_name: "array".into(), + element_type, + contains_null, + } + } + + #[inline] + pub const fn element_type(&self) -> &DataType { + &self.element_type + } + + #[inline] + pub const fn contains_null(&self) -> bool { + self.contains_null + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +#[serde(rename_all = "camelCase")] +pub struct MapType { + #[serde(rename = "type")] + pub type_name: String, + /// The type of element used for the key of this map + pub key_type: DataType, + /// The type of element used for the value of this map + pub value_type: DataType, + /// Denoting whether this array can contain one or more null values + #[serde(default = "default_true")] + pub value_contains_null: bool, +} + +impl MapType { + pub fn new(key_type: DataType, value_type: DataType, value_contains_null: bool) -> Self { + Self { + type_name: "map".into(), + key_type, + value_type, + value_contains_null, + } + } + + #[inline] + pub const fn key_type(&self) -> &DataType { + &self.key_type + } + + #[inline] + pub const fn value_type(&self) -> &DataType { + &self.value_type + } + + #[inline] + pub const fn value_contains_null(&self) -> bool { + self.value_contains_null + } +} + +fn default_true() -> bool { + true +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +#[serde(rename_all = "camelCase")] +pub enum PrimitiveType { + /// UTF-8 encoded string of characters + String, + /// i64: 8-byte signed integer. Range: -9223372036854775808 to 9223372036854775807 + Long, + /// i32: 4-byte signed integer. Range: -2147483648 to 2147483647 + Integer, + /// i16: 2-byte signed integer numbers. Range: -32768 to 32767 + Short, + /// i8: 1-byte signed integer number. Range: -128 to 127 + Byte, + /// f32: 4-byte single-precision floating-point numbers + Float, + /// f64: 8-byte double-precision floating-point numbers + Double, + /// bool: boolean values + Boolean, + Binary, + Date, + /// Microsecond precision timestamp, adjusted to UTC. + Timestamp, + // TODO: timestamp without timezone + #[serde( + serialize_with = "serialize_decimal", + deserialize_with = "deserialize_decimal", + untagged + )] + Decimal(i32, i32), +} + +fn serialize_decimal( + precision: &i32, + scale: &i32, + serializer: S, +) -> Result { + serializer.serialize_str(&format!("decimal({},{})", precision, scale)) +} + +fn deserialize_decimal<'de, D>(deserializer: D) -> Result<(i32, i32), D::Error> +where + D: serde::Deserializer<'de>, +{ + let str_value = String::deserialize(deserializer)?; + if !str_value.starts_with("decimal(") || !str_value.ends_with(')') { + return Err(serde::de::Error::custom(format!( + "Invalid decimal: {}", + str_value + ))); + } + + let mut parts = str_value[8..str_value.len() - 1].split(','); + let precision = parts + .next() + .and_then(|part| part.trim().parse::().ok()) + .ok_or_else(|| { + serde::de::Error::custom(format!("Invalid precision in decimal: {}", str_value)) + })?; + let scale = parts + .next() + .and_then(|part| part.trim().parse::().ok()) + .ok_or_else(|| { + serde::de::Error::custom(format!("Invalid scale in decimal: {}", str_value)) + })?; + + Ok((precision, scale)) +} + +impl Display for PrimitiveType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + PrimitiveType::String => write!(f, "string"), + PrimitiveType::Long => write!(f, "bigint"), + PrimitiveType::Integer => write!(f, "int"), + PrimitiveType::Short => write!(f, "smallint"), + PrimitiveType::Byte => write!(f, "tinyint"), + PrimitiveType::Float => write!(f, "float"), + PrimitiveType::Double => write!(f, "double"), + PrimitiveType::Boolean => write!(f, "boolean"), + PrimitiveType::Binary => write!(f, "binary"), + PrimitiveType::Date => write!(f, "date"), + PrimitiveType::Timestamp => write!(f, "timestamp"), + PrimitiveType::Decimal(precision, scale) => { + write!(f, "decimal({}, {})", precision, scale) + } + } + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +#[serde(untagged, rename_all = "camelCase")] +pub enum DataType { + /// UTF-8 encoded string of characters + Primitive(PrimitiveType), + /// An array stores a variable length collection of items of some type. + Array(Box), + /// A struct is used to represent both the top-level schema of the table as well + /// as struct columns that contain nested columns. + Struct(Box), + /// A map stores an arbitrary length collection of key-value pairs + /// with a single keyType and a single valueType + Map(Box), +} + +impl DataType { + pub fn string() -> Self { + DataType::Primitive(PrimitiveType::String) + } + + pub fn long() -> Self { + DataType::Primitive(PrimitiveType::Long) + } + + pub fn integer() -> Self { + DataType::Primitive(PrimitiveType::Integer) + } + + pub fn short() -> Self { + DataType::Primitive(PrimitiveType::Short) + } + + pub fn byte() -> Self { + DataType::Primitive(PrimitiveType::Byte) + } + + pub fn float() -> Self { + DataType::Primitive(PrimitiveType::Float) + } + + pub fn double() -> Self { + DataType::Primitive(PrimitiveType::Double) + } + + pub fn boolean() -> Self { + DataType::Primitive(PrimitiveType::Boolean) + } + + pub fn binary() -> Self { + DataType::Primitive(PrimitiveType::Binary) + } + + pub fn date() -> Self { + DataType::Primitive(PrimitiveType::Date) + } + + pub fn timestamp() -> Self { + DataType::Primitive(PrimitiveType::Timestamp) + } + + pub fn decimal(precision: usize, scale: usize) -> Self { + DataType::Primitive(PrimitiveType::Decimal(precision as i32, scale as i32)) + } +} + +impl Display for DataType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + DataType::Primitive(p) => write!(f, "{}", p), + DataType::Array(a) => write!(f, "array<{}>", a.element_type), + DataType::Struct(s) => { + write!(f, "struct<")?; + for (i, field) in s.fields.iter().enumerate() { + if i > 0 { + write!(f, ", ")?; + } + write!(f, "{}: {}", field.name, field.data_type)?; + } + write!(f, ">") + } + DataType::Map(m) => write!(f, "map<{}, {}>", m.key_type, m.value_type), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json; + + #[test] + fn test_serde_data_types() { + let data = r#" + { + "name": "a", + "type": "integer", + "nullable": false, + "metadata": {} + } + "#; + let field: StructField = serde_json::from_str(data).unwrap(); + assert!(matches!( + field.data_type, + DataType::Primitive(PrimitiveType::Integer) + )); + + let data = r#" + { + "name": "c", + "type": { + "type": "array", + "elementType": "integer", + "containsNull": false + }, + "nullable": true, + "metadata": {} + } + "#; + let field: StructField = serde_json::from_str(data).unwrap(); + assert!(matches!(field.data_type, DataType::Array(_))); + + let data = r#" + { + "name": "e", + "type": { + "type": "array", + "elementType": { + "type": "struct", + "fields": [ + { + "name": "d", + "type": "integer", + "nullable": false, + "metadata": {} + } + ] + }, + "containsNull": true + }, + "nullable": true, + "metadata": {} + } + "#; + let field: StructField = serde_json::from_str(data).unwrap(); + assert!(matches!(field.data_type, DataType::Array(_))); + match field.data_type { + DataType::Array(array) => assert!(matches!(array.element_type, DataType::Struct(_))), + _ => unreachable!(), + } + + let data = r#" + { + "name": "f", + "type": { + "type": "map", + "keyType": "string", + "valueType": "string", + "valueContainsNull": true + }, + "nullable": true, + "metadata": {} + } + "#; + let field: StructField = serde_json::from_str(data).unwrap(); + assert!(matches!(field.data_type, DataType::Map(_))); + } + + #[test] + fn test_roundtrip_decimal() { + let data = r#" + { + "name": "a", + "type": "decimal(10, 2)", + "nullable": false, + "metadata": {} + } + "#; + let field: StructField = serde_json::from_str(data).unwrap(); + assert!(matches!( + field.data_type, + DataType::Primitive(PrimitiveType::Decimal(10, 2)) + )); + + let json_str = serde_json::to_string(&field).unwrap(); + assert_eq!( + json_str, + r#"{"name":"a","type":"decimal(10,2)","nullable":false,"metadata":{}}"# + ); + } + + #[test] + fn test_field_metadata() { + let data = r#" + { + "name": "e", + "type": { + "type": "array", + "elementType": { + "type": "struct", + "fields": [ + { + "name": "d", + "type": "integer", + "nullable": false, + "metadata": { + "delta.columnMapping.id": 5, + "delta.columnMapping.physicalName": "col-a7f4159c-53be-4cb0-b81a-f7e5240cfc49" + } + } + ] + }, + "containsNull": true + }, + "nullable": true, + "metadata": { + "delta.columnMapping.id": 4, + "delta.columnMapping.physicalName": "col-5f422f40-de70-45b2-88ab-1d5c90e94db1" + } + } + "#; + let field: StructField = serde_json::from_str(data).unwrap(); + + let col_id = field + .get_config_value(&ColumnMetadataKey::ColumnMappingId) + .unwrap(); + assert!(matches!(col_id, MetadataValue::Number(num) if *num == 4)); + let physical_name = field + .get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName) + .unwrap(); + assert!( + matches!(physical_name, MetadataValue::String(name) if *name == "col-5f422f40-de70-45b2-88ab-1d5c90e94db1") + ); + } + + #[test] + fn test_read_schemas() { + let file = std::fs::File::open("./tests/serde/schema.json").unwrap(); + let schema: Result = serde_json::from_reader(file); + assert!(schema.is_ok()); + + let file = std::fs::File::open("./tests/serde/checkpoint_schema.json").unwrap(); + let schema: Result = serde_json::from_reader(file); + assert!(schema.is_ok()) + } +}