Skip to content

Commit

Permalink
feat: add metadata for deletion vectors (delta-io#1583)
Browse files Browse the repository at this point in the history
# Description
This just adds the deletion vector metadata to the actions. It does not
interpret those yet, reading / writing deletion vectors is not supported
with this. Still it enables use cases where you use delta-rs just for
metadata retrieval

I have to add that I'm still learning rust and I expect this to take
some iterations until code quality is sufficient

# Related Issue(s)
Part of delta-io#1094 : Adds the required metadata

# Documentation


https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vectors

---------

Co-authored-by: Will Jones <willjones127@gmail.com>
  • Loading branch information
2 people authored and polynomialherder committed Aug 15, 2023
1 parent 838e707 commit f553588
Show file tree
Hide file tree
Showing 67 changed files with 449 additions and 5 deletions.
1 change: 1 addition & 0 deletions python/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ fn checkpoint_to_py(err: ProtocolError) -> PyErr {
ProtocolError::CheckpointNotFound => DeltaProtocolError::new_err(err.to_string()),
ProtocolError::InvalidField(err) => PyValueError::new_err(err),
ProtocolError::InvalidRow(err) => PyValueError::new_err(err),
ProtocolError::InvalidDeletionVectorStorageType(err) => PyValueError::new_err(err),
ProtocolError::SerializeOperation { source } => PyValueError::new_err(source.to_string()),
ProtocolError::ParquetParseError { source } => PyIOError::new_err(source.to_string()),
ProtocolError::IO { source } => PyIOError::new_err(source.to_string()),
Expand Down
2 changes: 2 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ impl RawDeltaTable {
extended_file_metadata: Some(old_add.tags.is_some()),
partition_values: Some(old_add.partition_values.clone()),
size: Some(old_add.size),
deletion_vector: old_add.deletion_vector.clone(),
tags: old_add.tags.clone(),
});
actions.push(remove_action);
Expand Down Expand Up @@ -788,6 +789,7 @@ impl From<&PyAddAction> for action::Add {
stats: action.stats.clone(),
stats_parsed: None,
tags: None,
deletion_vector: None,
}
}
}
Expand Down
197 changes: 197 additions & 0 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use serde_json::{Map, Value};
use std::borrow::Borrow;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::str::FromStr;

use crate::delta_config::IsolationLevel;
use crate::errors::DeltaResult;
Expand Down Expand Up @@ -49,6 +50,10 @@ pub enum ProtocolError {
#[error("Invalid action in parquet row: {0}")]
InvalidRow(String),

/// A transaction log contains invalid deletion vector storage type
#[error("Invalid deletion vector storage type: {0}")]
InvalidDeletionVectorStorageType(String),

/// A generic action error. The wrapped error string describes the details.
#[error("Generic action error: {0}")]
Generic(String),
Expand Down Expand Up @@ -219,6 +224,86 @@ pub struct AddCDCFile {
pub tags: Option<HashMap<String, Option<String>>>,
}

///Storage type of deletion vector
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[serde()]
pub enum StorageType {
/// Stored at relative path derived from a UUID.
#[serde(rename = "u")]
UuidRelativePath,
/// Stored as inline string.
#[serde(rename = "i")]
Inline,
/// Stored at an absolute path.
#[serde(rename = "p")]
AbsolutePath,
}

impl Default for StorageType {
fn default() -> Self {
Self::UuidRelativePath // seems to be used by Databricks and therefore most common
}
}

impl FromStr for StorageType {
type Err = ProtocolError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"u" => Ok(Self::UuidRelativePath),
"i" => Ok(Self::Inline),
"p" => Ok(Self::AbsolutePath),
_ => Err(ProtocolError::InvalidDeletionVectorStorageType(
s.to_string(),
)),
}
}
}

impl ToString for StorageType {
fn to_string(&self) -> String {
match self {
Self::UuidRelativePath => "u".to_string(),
Self::Inline => "i".to_string(),
Self::AbsolutePath => "p".to_string(),
}
}
}

/// Describes deleted rows of a parquet file as part of an add or remove action
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct DeletionVector {
///storageType of the deletion vector. p = Absolute Path, i = Inline, u = UUid Relative Path
pub storage_type: StorageType,

///If storageType = 'u' then <random prefix - optional><base85 encoded uuid>
///If storageType = 'i' then <base85 encoded bytes> of the deletion vector data
///If storageType = 'p' then <absolute path>
pub path_or_inline_dv: String,

///Start of the data for this DV in number of bytes from the beginning of the file it is stored in. Always None (absent in JSON) when storageType = 'i'.
pub offset: Option<i32>,

///Size of the serialized DV in bytes (raw data size, i.e. before base85 encoding, if inline).
pub size_in_bytes: i32,

///Number of rows the given DV logically removes from the file.
pub cardinality: i64,
}

impl PartialEq for DeletionVector {
fn eq(&self, other: &Self) -> bool {
self.storage_type == other.storage_type
&& self.path_or_inline_dv == other.path_or_inline_dv
&& self.offset == other.offset
&& self.size_in_bytes == other.size_in_bytes
&& self.cardinality == other.cardinality
}
}

impl Eq for DeletionVector {}

/// Delta log action that describes a parquet data file that is part of the table.
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -277,6 +362,9 @@ pub struct Add {
pub stats_parsed: Option<String>,
/// Map containing metadata about this file
pub tags: Option<HashMap<String, Option<String>>>,

///Metadata about deletion vector
pub deletion_vector: Option<DeletionVector>,
}

impl Hash for Add {
Expand All @@ -294,6 +382,7 @@ impl PartialEq for Add {
&& self.data_change == other.data_change
&& self.stats == other.stats
&& self.tags == other.tags
&& self.deletion_vector == other.deletion_vector
}
}

Expand Down Expand Up @@ -441,6 +530,8 @@ pub struct Remove {
pub size: Option<i64>,
/// Map containing metadata about this file
pub tags: Option<HashMap<String, Option<String>>>,
///Metadata about deletion vector
pub deletion_vector: Option<DeletionVector>,
}

impl Hash for Remove {
Expand All @@ -466,6 +557,7 @@ impl PartialEq for Remove {
&& self.partition_values == other.partition_values
&& self.size == other.size
&& self.tags == other.tags
&& self.deletion_vector == other.deletion_vector
}
}

Expand Down Expand Up @@ -1034,7 +1126,112 @@ mod tests {

assert_eq!(expected, actions);
}
#[tokio::test]
async fn test_with_deletion_vector() {
// test table with partitions
let path = "./tests/data/table_with_deletion_logs";
let table = crate::open_table(path).await.unwrap();
let actions = table.get_state().add_actions_table(true).unwrap();
let actions = sort_batch_by(&actions, "path").unwrap();
let actions = actions
.project(&[
actions.schema().index_of("path").unwrap(),
actions.schema().index_of("size_bytes").unwrap(),
actions
.schema()
.index_of("deletionVector.storageType")
.unwrap(),
actions
.schema()
.index_of("deletionVector.pathOrInlineDiv")
.unwrap(),
actions.schema().index_of("deletionVector.offset").unwrap(),
actions
.schema()
.index_of("deletionVector.sizeInBytes")
.unwrap(),
actions
.schema()
.index_of("deletionVector.cardinality")
.unwrap(),
])
.unwrap();
let expected_columns: Vec<(&str, ArrayRef)> = vec![
(
"path",
Arc::new(array::StringArray::from(vec![
"part-00000-cb251d5e-b665-437a-a9a7-fbfc5137c77d.c000.snappy.parquet",
])),
),
("size_bytes", Arc::new(array::Int64Array::from(vec![10499]))),
(
"deletionVector.storageType",
Arc::new(array::StringArray::from(vec!["u"])),
),
(
"deletionVector.pathOrInlineDiv",
Arc::new(array::StringArray::from(vec!["Q6Kt3y1b)0MgZSWwPunr"])),
),
(
"deletionVector.offset",
Arc::new(array::Int32Array::from(vec![1])),
),
(
"deletionVector.sizeInBytes",
Arc::new(array::Int32Array::from(vec![36])),
),
(
"deletionVector.cardinality",
Arc::new(array::Int64Array::from(vec![2])),
),
];
let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();

assert_eq!(expected, actions);

let actions = table.get_state().add_actions_table(false).unwrap();
let actions = sort_batch_by(&actions, "path").unwrap();
let actions = actions
.project(&[
actions.schema().index_of("path").unwrap(),
actions.schema().index_of("size_bytes").unwrap(),
actions.schema().index_of("deletionVector").unwrap(),
])
.unwrap();
let expected_columns: Vec<(&str, ArrayRef)> = vec![
(
"path",
Arc::new(array::StringArray::from(vec![
"part-00000-cb251d5e-b665-437a-a9a7-fbfc5137c77d.c000.snappy.parquet",
])),
),
("size_bytes", Arc::new(array::Int64Array::from(vec![10499]))),
(
"deletionVector",
Arc::new(array::StructArray::new(
Fields::from(vec![
Field::new("storageType", DataType::Utf8, false),
Field::new("pathOrInlineDiv", DataType::Utf8, false),
Field::new("offset", DataType::Int32, true),
Field::new("sizeInBytes", DataType::Int32, false),
Field::new("cardinality", DataType::Int64, false),
]),
vec![
Arc::new(array::StringArray::from(vec!["u"])) as ArrayRef,
Arc::new(array::StringArray::from(vec!["Q6Kt3y1b)0MgZSWwPunr"]))
as ArrayRef,
Arc::new(array::Int32Array::from(vec![1])) as ArrayRef,
Arc::new(array::Int32Array::from(vec![36])) as ArrayRef,
Arc::new(array::Int64Array::from(vec![2])) as ArrayRef,
],
None,
)),
),
];
let expected = RecordBatch::try_from_iter(expected_columns).unwrap();

assert_eq!(expected, actions);
}
#[tokio::test]
async fn test_without_partitions() {
// test table without partitions
Expand Down
66 changes: 63 additions & 3 deletions rust/src/action/parquet_read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, str::FromStr};

use chrono::{SecondsFormat, TimeZone, Utc};
use num_bigint::BigInt;
Expand All @@ -7,10 +7,12 @@ use parquet::record::{Field, ListAccessor, MapAccessor, RowAccessor};
use serde_json::json;

use crate::action::{
Action, Add, AddCDCFile, ColumnCountStat, ColumnValueStat, MetaData, Protocol, ProtocolError,
Remove, Stats, Txn,
Action, Add, AddCDCFile, ColumnCountStat, ColumnValueStat, DeletionVector, MetaData, Protocol,
ProtocolError, Remove, Stats, Txn,
};

use super::StorageType;

fn populate_hashmap_with_option_from_parquet_map(
map: &mut HashMap<String, Option<String>>,
pmap: &parquet::record::Map,
Expand Down Expand Up @@ -44,6 +46,56 @@ impl AddCDCFile {
}
}

impl DeletionVector {
fn from_parquet_record(record: &parquet::record::Row) -> Result<Self, ProtocolError> {
let mut re = Self {
..Default::default()
};
for (i, (name, _)) in record.get_column_iter().enumerate() {
match name.as_str() {
"storageType" => {
re.storage_type =
StorageType::from_str(record.get_string(i).map_err(|_| {
gen_action_type_error("add", "deletionVector.storage_type", "string")
})?)?;
}
"pathOrInlineDv" => {
re.path_or_inline_dv = record
.get_string(i)
.map_err(|_| {
gen_action_type_error("add", "deletionVector.pathOrInlineDv", "string")
})?
.clone();
}
"offset" => {
re.offset = match record.get_int(i) {
Ok(x) => Some(x),
_ => None,
}
}
"sizeInBytes" => {
re.size_in_bytes = record.get_int(i).map_err(|_| {
gen_action_type_error("add", "deletionVector.sizeInBytes", "int")
})?;
}
"cardinality" => {
re.cardinality = record.get_long(i).map_err(|_| {
gen_action_type_error("add", "deletionVector.sizeInBytes", "long")
})?;
}
_ => {
log::debug!(
"Unexpected field name `{}` for deletion vector: {:?}",
name,
record
);
}
}
}
Ok(re)
}
}

impl Add {
fn from_parquet_record(record: &parquet::record::Row) -> Result<Self, ProtocolError> {
let mut re = Self {
Expand Down Expand Up @@ -128,6 +180,14 @@ impl Add {
re.stats_parsed = None;
}
},
"deletionVector" => match record.get_group(i) {
Ok(row) => {
re.deletion_vector = Some(DeletionVector::from_parquet_record(row)?);
}
_ => {
re.deletion_vector = None;
}
},
_ => {
log::debug!(
"Unexpected field name `{}` for add action: {:?}",
Expand Down
Loading

0 comments on commit f553588

Please sign in to comment.