diff --git a/rust/src/lib.rs b/rust/src/lib.rs index b990214c5c..bffc482001 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -190,6 +190,8 @@ mod tests { assert_eq!(table.version(), 3); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); + assert_eq!(table.get_reader_features().len(), 0); + assert_eq!(table.get_writer_features().len(), 0); assert_eq!( table.get_files(), vec![ @@ -232,6 +234,8 @@ mod tests { assert_eq!(table.version(), 0); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); + assert_eq!(table.get_reader_features().len(), 0); + assert_eq!(table.get_writer_features().len(), 0); assert_eq!( table.get_files(), vec![ @@ -246,6 +250,8 @@ mod tests { assert_eq!(table.version(), 2); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); + assert_eq!(table.get_reader_features().len(), 0); + assert_eq!(table.get_writer_features().len(), 0); assert_eq!( table.get_files(), vec![ @@ -260,6 +266,8 @@ mod tests { assert_eq!(table.version(), 3); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); + assert_eq!(table.get_reader_features().len(), 0); + assert_eq!(table.get_writer_features().len(), 0); assert_eq!( table.get_files(), vec![ @@ -276,6 +284,8 @@ mod tests { assert_eq!(table.version(), 1); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); + assert_eq!(table.get_reader_features().len(), 0); + assert_eq!(table.get_writer_features().len(), 0); assert_eq!( table.get_files(), vec![ @@ -319,6 +329,8 @@ mod tests { assert_eq!(table.version(), 1); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); + assert_eq!(table.get_reader_features().len(), 0); + assert_eq!(table.get_writer_features().len(), 0); assert_eq!( table.get_files(), vec![ @@ -330,6 +342,8 @@ mod tests { assert_eq!(table.version(), 0); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); + assert_eq!(table.get_reader_features().len(), 0); + assert_eq!(table.get_writer_features().len(), 0); assert_eq!( table.get_files(), vec![ @@ -665,4 +679,22 @@ mod tests { ),] ); } + + #[tokio::test] + async fn read_delta_table_with_reader_writer_features() { + let table = crate::open_table("./tests/data/simple_table_with_reader_writer_features") + .await + .unwrap(); + assert_eq!(table.get_min_writer_version(), 7); + assert_eq!(table.get_min_reader_version(), 3); + + let reader_features = table.get_reader_features(); + assert_eq!(reader_features.len(), 1); + assert!(reader_features.contains(&"columnMapping".to_string())); + + let writer_features = table.get_writer_features(); + assert_eq!(writer_features.len(), 2); + assert!(writer_features.contains(&"columnMapping".to_string())); + assert!(writer_features.contains(&"identityColumns".to_string())); + } } diff --git a/rust/src/operations/create.rs b/rust/src/operations/create.rs index 7429007a6f..66ecc7c295 100644 --- a/rust/src/operations/create.rs +++ b/rust/src/operations/create.rs @@ -250,6 +250,7 @@ impl CreateBuilder { .unwrap_or_else(|| Protocol { min_reader_version: MAX_SUPPORTED_READER_VERSION, min_writer_version: MAX_SUPPORTED_WRITER_VERSION, + ..Default::default() }); let metadata = DeltaTableMetaData::new( @@ -393,12 +394,15 @@ mod tests { assert_eq!(table.version(), 0); assert_eq!(table.get_min_reader_version(), MAX_SUPPORTED_READER_VERSION); assert_eq!(table.get_min_writer_version(), MAX_SUPPORTED_WRITER_VERSION); + assert!(table.get_reader_features().is_empty()); + assert!(table.get_writer_features().is_empty()); assert_eq!(table.schema().unwrap(), &schema); // check we can overwrite default settings via adding actions let protocol = Protocol { min_reader_version: 0, min_writer_version: 0, + ..Default::default() }; let table = CreateBuilder::new() .with_location("memory://") diff --git a/rust/src/operations/restore.rs b/rust/src/operations/restore.rs index 59aceaa98f..7d521f81c9 100644 --- a/rust/src/operations/restore.rs +++ b/rust/src/operations/restore.rs @@ -205,6 +205,8 @@ async fn execute( Protocol { min_reader_version: table.get_min_reader_version(), min_writer_version: table.get_min_writer_version(), + reader_features: table.get_reader_features().into_iter().cloned().collect(), + writer_features: table.get_writer_features().into_iter().cloned().collect(), } } else { Protocol { @@ -216,6 +218,16 @@ async fn execute( table.get_min_writer_version(), snapshot.min_writer_version(), ), + reader_features: table + .get_reader_features() + .union(snapshot.reader_features()) + .cloned() + .collect(), + writer_features: table + .get_writer_features() + .union(snapshot.writer_features()) + .cloned() + .collect(), } }; actions.push(Action::protocol(protocol)); diff --git a/rust/src/operations/transaction/test_utils.rs b/rust/src/operations/transaction/test_utils.rs index ec7848b95a..2d0875843e 100644 --- a/rust/src/operations/transaction/test_utils.rs +++ b/rust/src/operations/transaction/test_utils.rs @@ -33,6 +33,7 @@ pub fn create_protocol_action(max_reader: Option, max_writer: Option) let protocol = Protocol { min_reader_version: max_reader.unwrap_or(crate::operations::MAX_SUPPORTED_READER_VERSION), min_writer_version: max_writer.unwrap_or(crate::operations::MAX_SUPPORTED_WRITER_VERSION), + ..Default::default() }; Action::protocol(protocol) } @@ -134,6 +135,7 @@ pub async fn create_initialized_table( protocol: Protocol { min_reader_version: 1, min_writer_version: 1, + ..Default::default() }, metadata: DeltaTableMetaData::new( None, diff --git a/rust/src/protocol/checkpoints.rs b/rust/src/protocol/checkpoints.rs index 3bf2eb962e..45aaa2840c 100644 --- a/rust/src/protocol/checkpoints.rs +++ b/rust/src/protocol/checkpoints.rs @@ -321,6 +321,8 @@ fn parquet_bytes_from_state( let jsons = std::iter::once(Action::protocol(Protocol { min_reader_version: state.min_reader_version(), min_writer_version: state.min_writer_version(), + reader_features: state.reader_features().into_iter().cloned().collect(), + writer_features: state.writer_features().into_iter().cloned().collect(), })) // metaData .chain(std::iter::once(Action::metaData(MetaData::try_from( diff --git a/rust/src/protocol/mod.rs b/rust/src/protocol/mod.rs index 424b83284e..c37c59040e 100644 --- a/rust/src/protocol/mod.rs +++ b/rust/src/protocol/mod.rs @@ -21,7 +21,7 @@ use regex::Regex; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use std::borrow::Borrow; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; use std::mem::take; use std::str::FromStr; @@ -637,9 +637,15 @@ pub struct Protocol { /// Minimum version of the Delta read protocol a client must implement to correctly read the /// table. pub min_reader_version: i32, - /// Minimum version of the Delta write protocol a client must implement to correctly read the + /// Minimum version of the Delta write protocol a client must implement to correctly write the /// table. pub min_writer_version: i32, + /// A collection of reader features that a client must implement in order to correctly read the table. + #[serde(skip_serializing_if = "HashSet::is_empty", default)] + pub reader_features: HashSet, + /// A collection of writer features that a client must implement in order to correctly write the table. + #[serde(skip_serializing_if = "HashSet::is_empty", default)] + pub writer_features: HashSet, } /// The commitInfo is a fairly flexible action within the delta specification, where arbitrary data can be stored. diff --git a/rust/src/table/mod.rs b/rust/src/table/mod.rs index 4883134fcd..229cf3414c 100644 --- a/rust/src/table/mod.rs +++ b/rust/src/table/mod.rs @@ -818,6 +818,16 @@ impl DeltaTable { self.state.min_writer_version() } + /// Returns a collection of reader features supported by the DeltaTable based on the loaded metadata. + pub fn get_reader_features(&self) -> &HashSet { + self.state.reader_features() + } + + /// Returns a collection of writer features supported by the DeltaTable based on the loaded metadata. + pub fn get_writer_features(&self) -> &HashSet { + self.state.writer_features() + } + /// Return table schema parsed from transaction log. Return None if table hasn't been loaded or /// no metadata was found in the log. pub fn schema(&self) -> Option<&Schema> { diff --git a/rust/src/table/state.rs b/rust/src/table/state.rs index e72f726ba8..4f5f399fdf 100644 --- a/rust/src/table/state.rs +++ b/rust/src/table/state.rs @@ -41,6 +41,8 @@ pub struct DeltaTableState { app_transaction_version: HashMap, min_reader_version: i32, min_writer_version: i32, + reader_features: HashSet, + writer_features: HashSet, // table metadata corresponding to current version current_metadata: Option, // retention period for tombstones in milli-seconds @@ -229,6 +231,16 @@ impl DeltaTableState { self.min_writer_version } + /// A collection of reader features required by the protocol. + pub fn reader_features(&self) -> &HashSet { + &self.reader_features + } + + /// A collection of writer features required by the protocol. + pub fn writer_features(&self) -> &HashSet { + &self.writer_features + } + /// The most recent metadata of the table. pub fn current_metadata(&self) -> Option<&DeltaTableMetaData> { self.current_metadata.as_ref() @@ -290,6 +302,14 @@ impl DeltaTableState { self.min_writer_version = new_state.min_writer_version; } + for feature in new_state.reader_features.iter() { + self.reader_features.insert(feature.to_owned()); + } + + for feature in new_state.writer_features.iter() { + self.writer_features.insert(feature.to_owned()); + } + if new_state.current_metadata.is_some() { self.tombstone_retention_millis = new_state.tombstone_retention_millis; self.log_retention_millis = new_state.log_retention_millis; @@ -339,6 +359,14 @@ impl DeltaTableState { protocol::Action::protocol(v) => { self.min_reader_version = v.min_reader_version; self.min_writer_version = v.min_writer_version; + + for feature in v.reader_features.iter() { + self.reader_features.insert(feature.to_owned()); + } + + for feature in v.writer_features.iter() { + self.writer_features.insert(feature.to_owned()); + } } protocol::Action::metaData(v) => { let md = DeltaTableMetaData::try_from(v)?; @@ -421,6 +449,8 @@ mod tests { app_transaction_version: Default::default(), min_reader_version: 0, min_writer_version: 0, + reader_features: Default::default(), + writer_features: Default::default(), current_metadata: None, tombstone_retention_millis: 0, log_retention_millis: 0, @@ -446,6 +476,8 @@ mod tests { current_metadata: None, min_reader_version: 1, min_writer_version: 1, + reader_features: Default::default(), + writer_features: Default::default(), app_transaction_version, tombstone_retention_millis: 0, log_retention_millis: 0, diff --git a/rust/tests/data/simple_table_with_reader_writer_features/_delta_log/00000000000000000000.json b/rust/tests/data/simple_table_with_reader_writer_features/_delta_log/00000000000000000000.json new file mode 100644 index 0000000000..dfa407e0bf --- /dev/null +++ b/rust/tests/data/simple_table_with_reader_writer_features/_delta_log/00000000000000000000.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1564524295023,"operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{}"},"isBlindAppend":true}} +{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["columnMapping"],"writerFeatures":["columnMapping","identityColumns"]}} +{"metaData":{"id":"22ef18ba-191c-4c36-a606-3dad5cdf3830","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1564524294376}}