Skip to content

Commit

Permalink
feat: added reading table features
Browse files Browse the repository at this point in the history
  • Loading branch information
scarman-db committed Oct 25, 2023
1 parent e4d36fa commit ff31a4f
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 58 deletions.
73 changes: 50 additions & 23 deletions rust/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::borrow::Borrow;
use std::collections::HashMap;
use std::fmt;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::mem::take;
use std::str::FromStr;
Expand Down Expand Up @@ -644,17 +643,46 @@ pub struct Protocol {
/// Table features are missing from older versions
/// The table features this reader supports
#[serde(skip_serializing_if = "Option::is_none")]
pub reader_features: Option<Vec<TableFeatures>>,
pub reader_features: Option<HashSet<ReaderFeatures>>,
/// Table features are missing from older versions
/// The table features this writer supports
#[serde(skip_serializing_if = "Option::is_none")]
pub writer_features: Option<Vec<TableFeatures>>,
pub writer_features: Option<HashSet<WriterFeatures>>,
}

/// Features table reader/writers can support as well as let users know
/// Features table readers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq)]
pub enum TableFeatures {
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum ReaderFeatures {
/// Mapping of one column to another
#[serde(alias = "columnMapping")]
COLUMN_MAPPING,
/// Deletion vectors for merge, update, delete
#[serde(alias = "deletionVectors")]
DELETION_VECTORS,
/// timestamps without timezone support
#[serde(alias = "timestampNtz")]
TIMESTAMP_WITHOUT_TIMEZONE,
/// version 2 of checkpointing
#[serde(alias = "v2Checkpoint")]
V2_CHECKPOINT,
}

#[allow(clippy::from_over_into)]
impl Into<usize> for ReaderFeatures {
fn into(self) -> usize {
match self {
ReaderFeatures::COLUMN_MAPPING => 2,
ReaderFeatures::DELETION_VECTORS
| ReaderFeatures::TIMESTAMP_WITHOUT_TIMEZONE
| ReaderFeatures::V2_CHECKPOINT => 3,
}
}
}
/// Features table writers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum WriterFeatures {
/// Append Only Tables
#[serde(alias = "appendOnly")]
APPEND_ONLY,
Expand Down Expand Up @@ -699,23 +727,22 @@ pub enum TableFeatures {
LIQUID,
}

impl fmt::Display for TableFeatures {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#[allow(clippy::from_over_into)]
impl Into<usize> for WriterFeatures {
fn into(self) -> usize {
match self {
TableFeatures::APPEND_ONLY => write!(f, "appendOnly"),
TableFeatures::INVARIANTS => write!(f, "invariants"),
TableFeatures::CHECK_CONSTRAINTS => write!(f, "checkConstraints"),
TableFeatures::CHANGE_DATA_FEED => write!(f, "changeDataFeed"),
TableFeatures::GENERATED_COLUMNS => write!(f, "generatedColumns"),
TableFeatures::COLUMN_MAPPING => write!(f, "columnMapping"),
TableFeatures::IDENTITY_COLUMNS => write!(f, "identityColumns"),
TableFeatures::DELETION_VECTORS => write!(f, "deletionVector"),
TableFeatures::ROW_TRACKING => write!(f, "rowTracking"),
TableFeatures::TIMESTAMP_WITHOUT_TIMEZONE => write!(f, "timestampNtz"),
TableFeatures::DOMAIN_METADATA => write!(f, "domainMetadata"),
TableFeatures::V2_CHECKPOINT => write!(f, "v2Checkpoint"),
TableFeatures::ICEBERG_COMPAT_V1 => write!(f, "icebergCompatV1"),
TableFeatures::LIQUID => write!(f, "liquid"),
WriterFeatures::APPEND_ONLY | WriterFeatures::INVARIANTS => 2,
WriterFeatures::CHECK_CONSTRAINTS => 3,
WriterFeatures::CHANGE_DATA_FEED | WriterFeatures::GENERATED_COLUMNS => 4,
WriterFeatures::COLUMN_MAPPING => 5,
WriterFeatures::IDENTITY_COLUMNS
| WriterFeatures::DELETION_VECTORS
| WriterFeatures::ROW_TRACKING
| WriterFeatures::TIMESTAMP_WITHOUT_TIMEZONE
| WriterFeatures::DOMAIN_METADATA
| WriterFeatures::V2_CHECKPOINT
| WriterFeatures::ICEBERG_COMPAT_V1
| WriterFeatures::LIQUID => 7,
}
}
}
Expand Down
10 changes: 7 additions & 3 deletions rust/src/protocol/parquet2_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ mod stats;
mod string;
mod validity;

/// Parquet deserilization error
/// Parquet deserialization error
#[derive(thiserror::Error, Debug)]
pub enum ParseError {
/// Generic parsing error
Expand Down Expand Up @@ -619,14 +619,18 @@ fn deserialize_protocol_column_page(
page,
dict,
descriptor,
|action: &mut Protocol, v: Vec<String>| action.reader_features = Some(v),
|action: &mut Protocol, v: Vec<String>| {
action.reader_features = v.into_iter().map(Into::into).collect()
},
),
"writerFeatures" => for_each_repeated_string_field_value(
actions,
page,
dict,
descriptor,
|action: &mut Protocol, v: Vec<String>| action.writer_features = Some(v),
|action: &mut Protocol, v: Vec<String>| {
action.writer_features = v.into_iter().map(Into::into).collect()
},
),
_ => {
warn!("Unexpected field `{}` in protocol", f);
Expand Down
56 changes: 32 additions & 24 deletions rust/src/protocol/parquet_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde_json::json;

use crate::protocol::{
Action, Add, AddCDCFile, ColumnCountStat, ColumnValueStat, DeletionVector, MetaData, Protocol,
ProtocolError, Remove, Stats, TableFeatures, Txn,
ProtocolError, ReaderFeatures, Remove, Stats, Txn, WriterFeatures,
};

use super::StorageType;
Expand Down Expand Up @@ -592,24 +592,38 @@ impl Txn {
}
}

impl From<&Field> for TableFeatures {
impl From<&Field> for ReaderFeatures {
fn from(value: &Field) -> Self {
match value {
Field::Str(feature) => match feature.as_str() {
"appendOnly" => TableFeatures::APPEND_ONLY,
"invariants" => TableFeatures::INVARIANTS,
"checkConstraints" => TableFeatures::CHECK_CONSTRAINTS,
"changeDataFeed" => TableFeatures::CHANGE_DATA_FEED,
"generatedColumns" => TableFeatures::GENERATED_COLUMNS,
"columnMapping" => TableFeatures::COLUMN_MAPPING,
"identityColumns" => TableFeatures::IDENTITY_COLUMNS,
"deletionVectors" => TableFeatures::DELETION_VECTORS,
"rowTracking" => TableFeatures::ROW_TRACKING,
"timestampNtz" => TableFeatures::TIMESTAMP_WITHOUT_TIMEZONE,
"domainMetadata" => TableFeatures::DOMAIN_METADATA,
"v2Checkpoint" => TableFeatures::V2_CHECKPOINT,
"icebergCompatV1" => TableFeatures::ICEBERG_COMPAT_V1,
"liquid" => TableFeatures::LIQUID,
"columnMapping" => ReaderFeatures::COLUMN_MAPPING,
"deletionVectors" => ReaderFeatures::DELETION_VECTORS,
"timestampNtz" => ReaderFeatures::TIMESTAMP_WITHOUT_TIMEZONE,
"v2Checkpoint" => ReaderFeatures::V2_CHECKPOINT,
f => panic!("Unknown reader feature encountered: {}", f),
},
f => panic!("Unknown field in reader features field: {}", f),
}
}
}
impl From<&Field> for WriterFeatures {
fn from(value: &Field) -> Self {
match value {
Field::Str(feature) => match feature.as_str() {
"appendOnly" => WriterFeatures::APPEND_ONLY,
"invariants" => WriterFeatures::INVARIANTS,
"checkConstraints" => WriterFeatures::CHECK_CONSTRAINTS,
"changeDataFeed" => WriterFeatures::CHANGE_DATA_FEED,
"generatedColumns" => WriterFeatures::GENERATED_COLUMNS,
"columnMapping" => WriterFeatures::COLUMN_MAPPING,
"identityColumns" => WriterFeatures::IDENTITY_COLUMNS,
"deletionVectors" => WriterFeatures::DELETION_VECTORS,
"rowTracking" => WriterFeatures::ROW_TRACKING,
"timestampNtz" => WriterFeatures::TIMESTAMP_WITHOUT_TIMEZONE,
"domainMetadata" => WriterFeatures::DOMAIN_METADATA,
"v2Checkpoint" => WriterFeatures::V2_CHECKPOINT,
"icebergCompatV1" => WriterFeatures::ICEBERG_COMPAT_V1,
"liquid" => WriterFeatures::LIQUID,
f => panic!("Unknown table feature encountered: {}", f),
},
f => panic!("Invalid field type for table features: {}", f),
Expand All @@ -636,16 +650,10 @@ impl Protocol {
})?;
}
"readerFeatures" => {
re.reader_features = record
.get_list(i)
.and_then(|l| Ok(l.elements().iter().map(From::from).collect()))
.ok();
re.reader_features = record.get_list(i).map(|l| l.elements().iter().map(From::from).collect()).ok()
}
"writerFeatures" => {
re.writer_features = record
.get_list(i)
.and_then(|l| Ok(l.elements().iter().map(From::from).collect()))
.ok();
re.writer_features = record.get_list(i).map(|l| l.elements().iter().map(From::from).collect()).ok()
}
_ => {
log::debug!(
Expand Down
7 changes: 4 additions & 3 deletions rust/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use self::state::DeltaTableState;
use crate::errors::DeltaTableError;
use crate::partitions::PartitionFilter;
use crate::protocol::{
self, find_latest_check_point_for_version, get_last_checkpoint, Action, TableFeatures,
self, find_latest_check_point_for_version, get_last_checkpoint, Action, ReaderFeatures,
WriterFeatures,
};
use crate::protocol::{Add, ProtocolError, Stats};
use crate::schema::*;
Expand Down Expand Up @@ -821,12 +822,12 @@ impl DeltaTable {
}

/// Returns current supported reader features by this table
pub fn get_reader_features(&self) -> Option<&Vec<TableFeatures>> {
pub fn get_reader_features(&self) -> Option<&HashSet<ReaderFeatures>> {
self.state.reader_features()
}

/// Returns current supported writer features by this table
pub fn get_writer_features(&self) -> Option<&Vec<TableFeatures>> {
pub fn get_writer_features(&self) -> Option<&HashSet<WriterFeatures>> {
self.state.writer_features()
}

Expand Down
10 changes: 5 additions & 5 deletions rust/src/table/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
use super::config::TableConfig;
use crate::errors::DeltaTableError;
use crate::partitions::{DeltaTablePartition, PartitionFilter};
use crate::protocol::{self, Action, Add, ProtocolError, TableFeatures};
use crate::protocol::{self, Action, Add, ProtocolError, ReaderFeatures, WriterFeatures};
use crate::schema::SchemaDataType;
use crate::storage::commit_uri_from_version;
use crate::table::DeltaTableMetaData;
Expand Down Expand Up @@ -41,8 +41,8 @@ pub struct DeltaTableState {
app_transaction_version: HashMap<String, i64>,
min_reader_version: i32,
min_writer_version: i32,
reader_features: Option<Vec<TableFeatures>>,
writer_features: Option<Vec<TableFeatures>>,
reader_features: Option<HashSet<ReaderFeatures>>,
writer_features: Option<HashSet<WriterFeatures>>,
// table metadata corresponding to current version
current_metadata: Option<DeltaTableMetaData>,
// retention period for tombstones in milli-seconds
Expand Down Expand Up @@ -232,12 +232,12 @@ impl DeltaTableState {
}

/// Current supported reader features
pub fn reader_features(&self) -> Option<&Vec<TableFeatures>> {
pub fn reader_features(&self) -> Option<&HashSet<ReaderFeatures>> {
self.reader_features.as_ref()
}

/// Current supported writer features
pub fn writer_features(&self) -> Option<&Vec<TableFeatures>> {
pub fn writer_features(&self) -> Option<&HashSet<WriterFeatures>> {
self.writer_features.as_ref()
}

Expand Down

0 comments on commit ff31a4f

Please sign in to comment.