Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add metadata for deletion vectors #1583

Merged
merged 19 commits into from
Aug 15, 2023
Merged
1 change: 1 addition & 0 deletions python/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ fn checkpoint_to_py(err: ProtocolError) -> PyErr {
ProtocolError::CheckpointNotFound => DeltaProtocolError::new_err(err.to_string()),
ProtocolError::InvalidField(err) => PyValueError::new_err(err),
ProtocolError::InvalidRow(err) => PyValueError::new_err(err),
ProtocolError::InvalidDeletionVectorStorageType(err) => PyValueError::new_err(err),
ProtocolError::SerializeOperation { source } => PyValueError::new_err(source.to_string()),
ProtocolError::ParquetParseError { source } => PyIOError::new_err(source.to_string()),
ProtocolError::IO { source } => PyIOError::new_err(source.to_string()),
Expand Down
2 changes: 2 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ impl RawDeltaTable {
extended_file_metadata: Some(old_add.tags.is_some()),
partition_values: Some(old_add.partition_values.clone()),
size: Some(old_add.size),
deletion_vector: old_add.deletion_vector.clone(),
tags: old_add.tags.clone(),
});
actions.push(remove_action);
Expand Down Expand Up @@ -788,6 +789,7 @@ impl From<&PyAddAction> for action::Add {
stats: action.stats.clone(),
stats_parsed: None,
tags: None,
deletion_vector: None,
}
}
}
Expand Down
197 changes: 197 additions & 0 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use serde_json::{Map, Value};
use std::borrow::Borrow;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::str::FromStr;

use crate::delta_config::IsolationLevel;
use crate::errors::DeltaResult;
Expand Down Expand Up @@ -49,6 +50,10 @@ pub enum ProtocolError {
#[error("Invalid action in parquet row: {0}")]
InvalidRow(String),

/// A transaction log contains invalid deletion vector storage type
#[error("Invalid deletion vector storage type: {0}")]
InvalidDeletionVectorStorageType(String),

/// A generic action error. The wrapped error string describes the details.
#[error("Generic action error: {0}")]
Generic(String),
Expand Down Expand Up @@ -219,6 +224,86 @@ pub struct AddCDCFile {
pub tags: Option<HashMap<String, Option<String>>>,
}

///Storage type of deletion vector
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[serde()]
pub enum StorageType {
/// Stored at relative path derived from a UUID.
#[serde(rename = "u")]
UuidRelativePath,
/// Stored as inline string.
#[serde(rename = "i")]
Inline,
/// Stored at an absolute path.
#[serde(rename = "p")]
AbsolutePath,
}

impl Default for StorageType {
fn default() -> Self {
Self::UuidRelativePath // seems to be used by Databricks and therefore most common
}
}

impl FromStr for StorageType {
type Err = ProtocolError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"u" => Ok(Self::UuidRelativePath),
"i" => Ok(Self::Inline),
"p" => Ok(Self::AbsolutePath),
_ => Err(ProtocolError::InvalidDeletionVectorStorageType(
s.to_string(),
)),
}
}
}

impl ToString for StorageType {
fn to_string(&self) -> String {
match self {
Self::UuidRelativePath => "u".to_string(),
Self::Inline => "i".to_string(),
Self::AbsolutePath => "p".to_string(),
}
}
}

/// Describes deleted rows of a parquet file as part of an add or remove action
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct DeletionVector {
///storageType of the deletion vector. p = Absolute Path, i = Inline, u = UUid Relative Path
pub storage_type: StorageType,

///If storageType = 'u' then <random prefix - optional><base85 encoded uuid>
///If storageType = 'i' then <base85 encoded bytes> of the deletion vector data
///If storageType = 'p' then <absolute path>
pub path_or_inline_dv: String,

///Start of the data for this DV in number of bytes from the beginning of the file it is stored in. Always None (absent in JSON) when storageType = 'i'.
pub offset: Option<i32>,

///Size of the serialized DV in bytes (raw data size, i.e. before base85 encoding, if inline).
pub size_in_bytes: i32,

///Number of rows the given DV logically removes from the file.
pub cardinality: i64,
}

impl PartialEq for DeletionVector {
fn eq(&self, other: &Self) -> bool {
self.storage_type == other.storage_type
&& self.path_or_inline_dv == other.path_or_inline_dv
&& self.offset == other.offset
&& self.size_in_bytes == other.size_in_bytes
&& self.cardinality == other.cardinality
}
}

impl Eq for DeletionVector {}

/// Delta log action that describes a parquet data file that is part of the table.
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -277,6 +362,9 @@ pub struct Add {
pub stats_parsed: Option<String>,
/// Map containing metadata about this file
pub tags: Option<HashMap<String, Option<String>>>,

///Metadata about deletion vector
pub deletion_vector: Option<DeletionVector>,
}

impl Hash for Add {
Expand All @@ -294,6 +382,7 @@ impl PartialEq for Add {
&& self.data_change == other.data_change
&& self.stats == other.stats
&& self.tags == other.tags
&& self.deletion_vector == other.deletion_vector
}
}

Expand Down Expand Up @@ -441,6 +530,8 @@ pub struct Remove {
pub size: Option<i64>,
/// Map containing metadata about this file
pub tags: Option<HashMap<String, Option<String>>>,
///Metadata about deletion vector
pub deletion_vector: Option<DeletionVector>,
}

impl Hash for Remove {
Expand All @@ -466,6 +557,7 @@ impl PartialEq for Remove {
&& self.partition_values == other.partition_values
&& self.size == other.size
&& self.tags == other.tags
&& self.deletion_vector == other.deletion_vector
}
}

Expand Down Expand Up @@ -1034,7 +1126,112 @@ mod tests {

assert_eq!(expected, actions);
}
#[tokio::test]
async fn test_with_deletion_vector() {
// test table with partitions
let path = "./tests/data/table_with_deletion_logs";
let table = crate::open_table(path).await.unwrap();
let actions = table.get_state().add_actions_table(true).unwrap();
let actions = sort_batch_by(&actions, "path").unwrap();
let actions = actions
.project(&[
actions.schema().index_of("path").unwrap(),
actions.schema().index_of("size_bytes").unwrap(),
actions
.schema()
.index_of("deletionVector.storageType")
.unwrap(),
actions
.schema()
.index_of("deletionVector.pathOrInlineDiv")
.unwrap(),
actions.schema().index_of("deletionVector.offset").unwrap(),
actions
.schema()
.index_of("deletionVector.sizeInBytes")
.unwrap(),
actions
.schema()
.index_of("deletionVector.cardinality")
.unwrap(),
])
.unwrap();
let expected_columns: Vec<(&str, ArrayRef)> = vec![
(
"path",
Arc::new(array::StringArray::from(vec![
"part-00000-cb251d5e-b665-437a-a9a7-fbfc5137c77d.c000.snappy.parquet",
])),
),
("size_bytes", Arc::new(array::Int64Array::from(vec![10499]))),
(
"deletionVector.storageType",
Arc::new(array::StringArray::from(vec!["u"])),
),
(
"deletionVector.pathOrInlineDiv",
Arc::new(array::StringArray::from(vec!["Q6Kt3y1b)0MgZSWwPunr"])),
),
(
"deletionVector.offset",
Arc::new(array::Int32Array::from(vec![1])),
),
(
"deletionVector.sizeInBytes",
Arc::new(array::Int32Array::from(vec![36])),
),
(
"deletionVector.cardinality",
Arc::new(array::Int64Array::from(vec![2])),
),
];
let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();

assert_eq!(expected, actions);

let actions = table.get_state().add_actions_table(false).unwrap();
let actions = sort_batch_by(&actions, "path").unwrap();
let actions = actions
.project(&[
actions.schema().index_of("path").unwrap(),
actions.schema().index_of("size_bytes").unwrap(),
actions.schema().index_of("deletionVector").unwrap(),
])
.unwrap();
let expected_columns: Vec<(&str, ArrayRef)> = vec![
(
"path",
Arc::new(array::StringArray::from(vec![
"part-00000-cb251d5e-b665-437a-a9a7-fbfc5137c77d.c000.snappy.parquet",
])),
),
("size_bytes", Arc::new(array::Int64Array::from(vec![10499]))),
(
"deletionVector",
Arc::new(array::StructArray::new(
Fields::from(vec![
Field::new("storageType", DataType::Utf8, false),
Field::new("pathOrInlineDiv", DataType::Utf8, false),
Field::new("offset", DataType::Int32, true),
Field::new("sizeInBytes", DataType::Int32, false),
Field::new("cardinality", DataType::Int64, false),
]),
vec![
Arc::new(array::StringArray::from(vec!["u"])) as ArrayRef,
Arc::new(array::StringArray::from(vec!["Q6Kt3y1b)0MgZSWwPunr"]))
as ArrayRef,
Arc::new(array::Int32Array::from(vec![1])) as ArrayRef,
Arc::new(array::Int32Array::from(vec![36])) as ArrayRef,
Arc::new(array::Int64Array::from(vec![2])) as ArrayRef,
],
None,
)),
),
];
let expected = RecordBatch::try_from_iter(expected_columns).unwrap();

assert_eq!(expected, actions);
}
#[tokio::test]
async fn test_without_partitions() {
// test table without partitions
Expand Down
66 changes: 63 additions & 3 deletions rust/src/action/parquet_read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, str::FromStr};

use chrono::{SecondsFormat, TimeZone, Utc};
use num_bigint::BigInt;
Expand All @@ -7,10 +7,12 @@ use parquet::record::{Field, ListAccessor, MapAccessor, RowAccessor};
use serde_json::json;

use crate::action::{
Action, Add, AddCDCFile, ColumnCountStat, ColumnValueStat, MetaData, Protocol, ProtocolError,
Remove, Stats, Txn,
Action, Add, AddCDCFile, ColumnCountStat, ColumnValueStat, DeletionVector, MetaData, Protocol,
ProtocolError, Remove, Stats, Txn,
};

use super::StorageType;

fn populate_hashmap_with_option_from_parquet_map(
map: &mut HashMap<String, Option<String>>,
pmap: &parquet::record::Map,
Expand Down Expand Up @@ -44,6 +46,56 @@ impl AddCDCFile {
}
}

impl DeletionVector {
fn from_parquet_record(record: &parquet::record::Row) -> Result<Self, ProtocolError> {
let mut re = Self {
..Default::default()
};
for (i, (name, _)) in record.get_column_iter().enumerate() {
match name.as_str() {
"storageType" => {
re.storage_type =
StorageType::from_str(record.get_string(i).map_err(|_| {
gen_action_type_error("add", "deletionVector.storage_type", "string")
})?)?;
}
"pathOrInlineDv" => {
re.path_or_inline_dv = record
.get_string(i)
.map_err(|_| {
gen_action_type_error("add", "deletionVector.pathOrInlineDv", "string")
})?
.clone();
}
"offset" => {
re.offset = match record.get_int(i) {
Ok(x) => Some(x),
_ => None,
}
}
"sizeInBytes" => {
re.size_in_bytes = record.get_int(i).map_err(|_| {
gen_action_type_error("add", "deletionVector.sizeInBytes", "int")
})?;
}
"cardinality" => {
re.cardinality = record.get_long(i).map_err(|_| {
gen_action_type_error("add", "deletionVector.sizeInBytes", "long")
})?;
}
_ => {
log::debug!(
"Unexpected field name `{}` for deletion vector: {:?}",
name,
record
);
}
}
}
Ok(re)
}
}

impl Add {
fn from_parquet_record(record: &parquet::record::Row) -> Result<Self, ProtocolError> {
let mut re = Self {
Expand Down Expand Up @@ -128,6 +180,14 @@ impl Add {
re.stats_parsed = None;
}
},
"deletionVector" => match record.get_group(i) {
Ok(row) => {
re.deletion_vector = Some(DeletionVector::from_parquet_record(row)?);
}
_ => {
re.deletion_vector = None;
}
},
_ => {
log::debug!(
"Unexpected field name `{}` for add action: {:?}",
Expand Down
Loading
Loading