Skip to content

Commit

Permalink
feat: updated table features to use HashSet and separate reader and w…
Browse files Browse the repository at this point in the history
…riter features to be able to easily calculate min reader/writer version
  • Loading branch information
scarman-db committed Oct 25, 2023
1 parent e4d36fa commit 4dc2d52
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 48 deletions.
73 changes: 49 additions & 24 deletions rust/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::borrow::Borrow;
use std::collections::HashMap;
use std::fmt;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::mem::take;
use std::str::FromStr;
Expand Down Expand Up @@ -76,7 +75,7 @@ pub enum ProtocolError {
source: parquet::errors::ParquetError,
},

/// Faild to serialize operation
/// Failed to serialize operation
#[error("Failed to serialize operation: {source}")]
SerializeOperation {
#[from]
Expand Down Expand Up @@ -644,17 +643,45 @@ pub struct Protocol {
/// Table features are missing from older versions
/// The table features this reader supports
#[serde(skip_serializing_if = "Option::is_none")]
pub reader_features: Option<Vec<TableFeatures>>,
pub reader_features: Option<HashSet<ReaderFeatures>>,
/// Table features are missing from older versions
/// The table features this writer supports
#[serde(skip_serializing_if = "Option::is_none")]
pub writer_features: Option<Vec<TableFeatures>>,
pub writer_features: Option<HashSet<WriterFeatures>>,
}

/// Features table reader/writers can support as well as let users know
/// Features table readers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq)]
pub enum TableFeatures {
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum ReaderFeatures {
/// Mapping of one column to another
#[serde(alias = "columnMapping")]
COLUMN_MAPPING,
/// Deletion vectors for merge, update, delete
#[serde(alias = "deletionVectors")]
DELETION_VECTORS,
/// timestamps without timezone support
#[serde(alias = "timestampNtz")]
TIMESTAMP_WITHOUT_TIMEZONE,
/// version 2 of checkpointing
#[serde(alias = "v2Checkpoint")]
V2_CHECKPOINT,
}

impl Into<usize> for ReaderFeatures {
fn into(self) -> usize {
match self {
ReaderFeatures::COLUMN_MAPPING => 2,
ReaderFeatures::DELETION_VECTORS
| ReaderFeatures::TIMESTAMP_WITHOUT_TIMEZONE
| ReaderFeatures::V2_CHECKPOINT => 3,
}
}
}
/// Features table writers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum WriterFeatures {
/// Append Only Tables
#[serde(alias = "appendOnly")]
APPEND_ONLY,
Expand Down Expand Up @@ -699,23 +726,21 @@ pub enum TableFeatures {
LIQUID,
}

impl fmt::Display for TableFeatures {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
impl Into<usize> for WriterFeatures {
fn into(self) -> usize {
match self {
TableFeatures::APPEND_ONLY => write!(f, "appendOnly"),
TableFeatures::INVARIANTS => write!(f, "invariants"),
TableFeatures::CHECK_CONSTRAINTS => write!(f, "checkConstraints"),
TableFeatures::CHANGE_DATA_FEED => write!(f, "changeDataFeed"),
TableFeatures::GENERATED_COLUMNS => write!(f, "generatedColumns"),
TableFeatures::COLUMN_MAPPING => write!(f, "columnMapping"),
TableFeatures::IDENTITY_COLUMNS => write!(f, "identityColumns"),
TableFeatures::DELETION_VECTORS => write!(f, "deletionVector"),
TableFeatures::ROW_TRACKING => write!(f, "rowTracking"),
TableFeatures::TIMESTAMP_WITHOUT_TIMEZONE => write!(f, "timestampNtz"),
TableFeatures::DOMAIN_METADATA => write!(f, "domainMetadata"),
TableFeatures::V2_CHECKPOINT => write!(f, "v2Checkpoint"),
TableFeatures::ICEBERG_COMPAT_V1 => write!(f, "icebergCompatV1"),
TableFeatures::LIQUID => write!(f, "liquid"),
WriterFeatures::APPEND_ONLY | WriterFeatures::INVARIANTS => 2,
WriterFeatures::CHECK_CONSTRAINTS => 3,
WriterFeatures::CHANGE_DATA_FEED | WriterFeatures::GENERATED_COLUMNS => 4,
WriterFeatures::COLUMN_MAPPING => 5,
WriterFeatures::IDENTITY_COLUMNS
| WriterFeatures::DELETION_VECTORS
| WriterFeatures::ROW_TRACKING
| WriterFeatures::TIMESTAMP_WITHOUT_TIMEZONE
| WriterFeatures::DOMAIN_METADATA
| WriterFeatures::V2_CHECKPOINT
| WriterFeatures::ICEBERG_COMPAT_V1
| WriterFeatures::LIQUID => 7,
}
}
}
Expand Down
46 changes: 30 additions & 16 deletions rust/src/protocol/parquet_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde_json::json;

use crate::protocol::{
Action, Add, AddCDCFile, ColumnCountStat, ColumnValueStat, DeletionVector, MetaData, Protocol,
ProtocolError, Remove, Stats, TableFeatures, Txn,
ProtocolError, ReaderFeatures, Remove, Stats, Txn, WriterFeatures,
};

use super::StorageType;
Expand Down Expand Up @@ -592,24 +592,38 @@ impl Txn {
}
}

impl From<&Field> for TableFeatures {
impl From<&Field> for ReaderFeatures {
fn from(value: &Field) -> Self {
match value {
Field::Str(feature) => match feature.as_str() {
"appendOnly" => TableFeatures::APPEND_ONLY,
"invariants" => TableFeatures::INVARIANTS,
"checkConstraints" => TableFeatures::CHECK_CONSTRAINTS,
"changeDataFeed" => TableFeatures::CHANGE_DATA_FEED,
"generatedColumns" => TableFeatures::GENERATED_COLUMNS,
"columnMapping" => TableFeatures::COLUMN_MAPPING,
"identityColumns" => TableFeatures::IDENTITY_COLUMNS,
"deletionVectors" => TableFeatures::DELETION_VECTORS,
"rowTracking" => TableFeatures::ROW_TRACKING,
"timestampNtz" => TableFeatures::TIMESTAMP_WITHOUT_TIMEZONE,
"domainMetadata" => TableFeatures::DOMAIN_METADATA,
"v2Checkpoint" => TableFeatures::V2_CHECKPOINT,
"icebergCompatV1" => TableFeatures::ICEBERG_COMPAT_V1,
"liquid" => TableFeatures::LIQUID,
"columnMapping" => ReaderFeatures::COLUMN_MAPPING,
"deletionVectors" => ReaderFeatures::DELETION_VECTORS,
"timestampNtz" => ReaderFeatures::TIMESTAMP_WITHOUT_TIMEZONE,
"v2Checkpoint" => ReaderFeatures::V2_CHECKPOINT,
f => panic!("Unknown reader feature encountered: {}", f),
},
f => panic!("Unknown field in reader features field: {}", f),
}
}
}
impl From<&Field> for WriterFeatures {
fn from(value: &Field) -> Self {
match value {
Field::Str(feature) => match feature.as_str() {
"appendOnly" => WriterFeatures::APPEND_ONLY,
"invariants" => WriterFeatures::INVARIANTS,
"checkConstraints" => WriterFeatures::CHECK_CONSTRAINTS,
"changeDataFeed" => WriterFeatures::CHANGE_DATA_FEED,
"generatedColumns" => WriterFeatures::GENERATED_COLUMNS,
"columnMapping" => WriterFeatures::COLUMN_MAPPING,
"identityColumns" => WriterFeatures::IDENTITY_COLUMNS,
"deletionVectors" => WriterFeatures::DELETION_VECTORS,
"rowTracking" => WriterFeatures::ROW_TRACKING,
"timestampNtz" => WriterFeatures::TIMESTAMP_WITHOUT_TIMEZONE,
"domainMetadata" => WriterFeatures::DOMAIN_METADATA,
"v2Checkpoint" => WriterFeatures::V2_CHECKPOINT,
"icebergCompatV1" => WriterFeatures::ICEBERG_COMPAT_V1,
"liquid" => WriterFeatures::LIQUID,
f => panic!("Unknown table feature encountered: {}", f),
},
f => panic!("Invalid field type for table features: {}", f),
Expand Down
7 changes: 4 additions & 3 deletions rust/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use self::state::DeltaTableState;
use crate::errors::DeltaTableError;
use crate::partitions::PartitionFilter;
use crate::protocol::{
self, find_latest_check_point_for_version, get_last_checkpoint, Action, TableFeatures,
self, find_latest_check_point_for_version, get_last_checkpoint, Action, ReaderFeatures,
WriterFeatures,
};
use crate::protocol::{Add, ProtocolError, Stats};
use crate::schema::*;
Expand Down Expand Up @@ -821,12 +822,12 @@ impl DeltaTable {
}

/// Returns current supported reader features by this table
pub fn get_reader_features(&self) -> Option<&Vec<TableFeatures>> {
pub fn get_reader_features(&self) -> Option<&HashSet<ReaderFeatures>> {
self.state.reader_features()
}

/// Returns current supported writer features by this table
pub fn get_writer_features(&self) -> Option<&Vec<TableFeatures>> {
pub fn get_writer_features(&self) -> Option<&HashSet<WriterFeatures>> {
self.state.writer_features()
}

Expand Down
10 changes: 5 additions & 5 deletions rust/src/table/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
use super::config::TableConfig;
use crate::errors::DeltaTableError;
use crate::partitions::{DeltaTablePartition, PartitionFilter};
use crate::protocol::{self, Action, Add, ProtocolError, TableFeatures};
use crate::protocol::{self, Action, Add, ProtocolError, ReaderFeatures, WriterFeatures};
use crate::schema::SchemaDataType;
use crate::storage::commit_uri_from_version;
use crate::table::DeltaTableMetaData;
Expand Down Expand Up @@ -41,8 +41,8 @@ pub struct DeltaTableState {
app_transaction_version: HashMap<String, i64>,
min_reader_version: i32,
min_writer_version: i32,
reader_features: Option<Vec<TableFeatures>>,
writer_features: Option<Vec<TableFeatures>>,
reader_features: Option<HashSet<ReaderFeatures>>,
writer_features: Option<HashSet<WriterFeatures>>,
// table metadata corresponding to current version
current_metadata: Option<DeltaTableMetaData>,
// retention period for tombstones in milli-seconds
Expand Down Expand Up @@ -232,12 +232,12 @@ impl DeltaTableState {
}

/// Current supported reader features
pub fn reader_features(&self) -> Option<&Vec<TableFeatures>> {
pub fn reader_features(&self) -> Option<&HashSet<ReaderFeatures>> {
self.reader_features.as_ref()
}

/// Current supported writer features
pub fn writer_features(&self) -> Option<&Vec<TableFeatures>> {
pub fn writer_features(&self) -> Option<&HashSet<WriterFeatures>> {
self.writer_features.as_ref()
}

Expand Down

0 comments on commit 4dc2d52

Please sign in to comment.