Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add metadata for reader\writer features #1754

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ mod tests {
assert_eq!(table.version(), 3);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.get_reader_features().len(), 0);
assert_eq!(table.get_writer_features().len(), 0);
assert_eq!(
table.get_files(),
vec![
Expand Down Expand Up @@ -232,6 +234,8 @@ mod tests {
assert_eq!(table.version(), 0);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.get_reader_features().len(), 0);
assert_eq!(table.get_writer_features().len(), 0);
assert_eq!(
table.get_files(),
vec![
Expand All @@ -246,6 +250,8 @@ mod tests {
assert_eq!(table.version(), 2);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.get_reader_features().len(), 0);
assert_eq!(table.get_writer_features().len(), 0);
assert_eq!(
table.get_files(),
vec![
Expand All @@ -260,6 +266,8 @@ mod tests {
assert_eq!(table.version(), 3);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.get_reader_features().len(), 0);
assert_eq!(table.get_writer_features().len(), 0);
assert_eq!(
table.get_files(),
vec![
Expand All @@ -276,6 +284,8 @@ mod tests {
assert_eq!(table.version(), 1);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.get_reader_features().len(), 0);
assert_eq!(table.get_writer_features().len(), 0);
assert_eq!(
table.get_files(),
vec![
Expand Down Expand Up @@ -319,6 +329,8 @@ mod tests {
assert_eq!(table.version(), 1);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.get_reader_features().len(), 0);
assert_eq!(table.get_writer_features().len(), 0);
assert_eq!(
table.get_files(),
vec![
Expand All @@ -330,6 +342,8 @@ mod tests {
assert_eq!(table.version(), 0);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.get_reader_features().len(), 0);
assert_eq!(table.get_writer_features().len(), 0);
assert_eq!(
table.get_files(),
vec![
Expand Down Expand Up @@ -665,4 +679,22 @@ mod tests {
),]
);
}

#[tokio::test]
async fn read_delta_table_with_reader_writer_features() {
let table = crate::open_table("./tests/data/simple_table_with_reader_writer_features")
.await
.unwrap();
assert_eq!(table.get_min_writer_version(), 7);
assert_eq!(table.get_min_reader_version(), 3);

let reader_features = table.get_reader_features();
assert_eq!(reader_features.len(), 1);
assert!(reader_features.contains(&"columnMapping".to_string()));

let writer_features = table.get_writer_features();
assert_eq!(writer_features.len(), 2);
assert!(writer_features.contains(&"columnMapping".to_string()));
assert!(writer_features.contains(&"identityColumns".to_string()));
}
}
4 changes: 4 additions & 0 deletions rust/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ impl CreateBuilder {
.unwrap_or_else(|| Protocol {
min_reader_version: MAX_SUPPORTED_READER_VERSION,
min_writer_version: MAX_SUPPORTED_WRITER_VERSION,
..Default::default()
});

let metadata = DeltaTableMetaData::new(
Expand Down Expand Up @@ -393,12 +394,15 @@ mod tests {
assert_eq!(table.version(), 0);
assert_eq!(table.get_min_reader_version(), MAX_SUPPORTED_READER_VERSION);
assert_eq!(table.get_min_writer_version(), MAX_SUPPORTED_WRITER_VERSION);
assert!(table.get_reader_features().is_empty());
assert!(table.get_writer_features().is_empty());
assert_eq!(table.schema().unwrap(), &schema);

// check we can overwrite default settings via adding actions
let protocol = Protocol {
min_reader_version: 0,
min_writer_version: 0,
..Default::default()
};
let table = CreateBuilder::new()
.with_location("memory://")
Expand Down
12 changes: 12 additions & 0 deletions rust/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ async fn execute(
Protocol {
min_reader_version: table.get_min_reader_version(),
min_writer_version: table.get_min_writer_version(),
reader_features: table.get_reader_features().into_iter().cloned().collect(),
writer_features: table.get_writer_features().into_iter().cloned().collect(),
}
} else {
Protocol {
Expand All @@ -216,6 +218,16 @@ async fn execute(
table.get_min_writer_version(),
snapshot.min_writer_version(),
),
reader_features: table
.get_reader_features()
.union(snapshot.reader_features())
.cloned()
.collect(),
writer_features: table
.get_writer_features()
.union(snapshot.writer_features())
.cloned()
.collect(),
}
};
actions.push(Action::protocol(protocol));
Expand Down
2 changes: 2 additions & 0 deletions rust/src/operations/transaction/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub fn create_protocol_action(max_reader: Option<i32>, max_writer: Option<i32>)
let protocol = Protocol {
min_reader_version: max_reader.unwrap_or(crate::operations::MAX_SUPPORTED_READER_VERSION),
min_writer_version: max_writer.unwrap_or(crate::operations::MAX_SUPPORTED_WRITER_VERSION),
..Default::default()
};
Action::protocol(protocol)
}
Expand Down Expand Up @@ -134,6 +135,7 @@ pub async fn create_initialized_table(
protocol: Protocol {
min_reader_version: 1,
min_writer_version: 1,
..Default::default()
},
metadata: DeltaTableMetaData::new(
None,
Expand Down
2 changes: 2 additions & 0 deletions rust/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ fn parquet_bytes_from_state(
let jsons = std::iter::once(Action::protocol(Protocol {
min_reader_version: state.min_reader_version(),
min_writer_version: state.min_writer_version(),
reader_features: state.reader_features().into_iter().cloned().collect(),
writer_features: state.writer_features().into_iter().cloned().collect(),
}))
// metaData
.chain(std::iter::once(Action::metaData(MetaData::try_from(
Expand Down
10 changes: 8 additions & 2 deletions rust/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::borrow::Borrow;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::mem::take;
use std::str::FromStr;
Expand Down Expand Up @@ -637,9 +637,15 @@ pub struct Protocol {
/// Minimum version of the Delta read protocol a client must implement to correctly read the
/// table.
pub min_reader_version: i32,
/// Minimum version of the Delta write protocol a client must implement to correctly read the
/// Minimum version of the Delta write protocol a client must implement to correctly write the
/// table.
pub min_writer_version: i32,
/// A collection of reader features that a client must implement in order to correctly read the table.
#[serde(skip_serializing_if = "HashSet::is_empty", default)]
pub reader_features: HashSet<String>,
/// A collection of writer features that a client must implement in order to correctly write the table.
#[serde(skip_serializing_if = "HashSet::is_empty", default)]
pub writer_features: HashSet<String>,
}

/// The commitInfo is a fairly flexible action within the delta specification, where arbitrary data can be stored.
Expand Down
10 changes: 10 additions & 0 deletions rust/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,16 @@ impl DeltaTable {
self.state.min_writer_version()
}

/// Returns a collection of reader features supported by the DeltaTable based on the loaded metadata.
pub fn get_reader_features(&self) -> &HashSet<String> {
self.state.reader_features()
}

/// Returns a collection of writer features supported by the DeltaTable based on the loaded metadata.
pub fn get_writer_features(&self) -> &HashSet<String> {
self.state.writer_features()
}

/// Return table schema parsed from transaction log. Return None if table hasn't been loaded or
/// no metadata was found in the log.
pub fn schema(&self) -> Option<&Schema> {
Expand Down
32 changes: 32 additions & 0 deletions rust/src/table/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub struct DeltaTableState {
app_transaction_version: HashMap<String, i64>,
min_reader_version: i32,
min_writer_version: i32,
reader_features: HashSet<String>,
writer_features: HashSet<String>,
// table metadata corresponding to current version
current_metadata: Option<DeltaTableMetaData>,
// retention period for tombstones in milli-seconds
Expand Down Expand Up @@ -229,6 +231,16 @@ impl DeltaTableState {
self.min_writer_version
}

/// A collection of reader features required by the protocol.
pub fn reader_features(&self) -> &HashSet<String> {
&self.reader_features
}

/// A collection of writer features required by the protocol.
pub fn writer_features(&self) -> &HashSet<String> {
&self.writer_features
}

/// The most recent metadata of the table.
pub fn current_metadata(&self) -> Option<&DeltaTableMetaData> {
self.current_metadata.as_ref()
Expand Down Expand Up @@ -290,6 +302,14 @@ impl DeltaTableState {
self.min_writer_version = new_state.min_writer_version;
}

for feature in new_state.reader_features.iter() {
self.reader_features.insert(feature.to_owned());
}

for feature in new_state.writer_features.iter() {
self.writer_features.insert(feature.to_owned());
}
Comment on lines +305 to +311
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear to me why we should take the union, and not just adopt the latest set of features. This would mean you could never drop features, right?


if new_state.current_metadata.is_some() {
self.tombstone_retention_millis = new_state.tombstone_retention_millis;
self.log_retention_millis = new_state.log_retention_millis;
Expand Down Expand Up @@ -339,6 +359,14 @@ impl DeltaTableState {
protocol::Action::protocol(v) => {
self.min_reader_version = v.min_reader_version;
self.min_writer_version = v.min_writer_version;

for feature in v.reader_features.iter() {
self.reader_features.insert(feature.to_owned());
}

for feature in v.writer_features.iter() {
self.writer_features.insert(feature.to_owned());
}
}
protocol::Action::metaData(v) => {
let md = DeltaTableMetaData::try_from(v)?;
Expand Down Expand Up @@ -421,6 +449,8 @@ mod tests {
app_transaction_version: Default::default(),
min_reader_version: 0,
min_writer_version: 0,
reader_features: Default::default(),
writer_features: Default::default(),
current_metadata: None,
tombstone_retention_millis: 0,
log_retention_millis: 0,
Expand All @@ -446,6 +476,8 @@ mod tests {
current_metadata: None,
min_reader_version: 1,
min_writer_version: 1,
reader_features: Default::default(),
writer_features: Default::default(),
app_transaction_version,
tombstone_retention_millis: 0,
log_retention_millis: 0,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1564524295023,"operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{}"},"isBlindAppend":true}}
{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["columnMapping"],"writerFeatures":["columnMapping","identityColumns"]}}
{"metaData":{"id":"22ef18ba-191c-4c36-a606-3dad5cdf3830","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1564524294376}}
Loading