diff --git a/crates/core/src/operations/transaction/protocol.rs b/crates/core/src/operations/transaction/protocol.rs index 707f1daf02..ac5bab7738 100644 --- a/crates/core/src/operations/transaction/protocol.rs +++ b/crates/core/src/operations/transaction/protocol.rs @@ -2,6 +2,7 @@ use std::collections::HashSet; use lazy_static::lazy_static; use once_cell::sync::Lazy; +use tracing::log::*; use super::{TableReference, TransactionError}; use crate::kernel::{ @@ -148,17 +149,33 @@ impl ProtocolChecker { pub fn can_write_to(&self, snapshot: &dyn TableReference) -> Result<(), TransactionError> { // NOTE: writers must always support all required reader features self.can_read_from(snapshot)?; + let min_writer_version = snapshot.protocol().min_writer_version; + + let required_features: Option<&HashSet> = match min_writer_version { + 0 | 1 => None, + 2 => Some(&WRITER_V2), + 3 => Some(&WRITER_V3), + 4 => Some(&WRITER_V4), + 5 => Some(&WRITER_V5), + 6 => Some(&WRITER_V6), + _ => snapshot.protocol().writer_features.as_ref(), + }; - let required_features: Option<&HashSet> = - match snapshot.protocol().min_writer_version { - 0 | 1 => None, - 2 => Some(&WRITER_V2), - 3 => Some(&WRITER_V3), - 4 => Some(&WRITER_V4), - 5 => Some(&WRITER_V5), - 6 => Some(&WRITER_V6), - _ => snapshot.protocol().writer_features.as_ref(), - }; + if (4..7).contains(&min_writer_version) { + debug!("min_writer_version is less 4-6, checking for unsupported table features"); + if let Ok(schema) = snapshot.metadata().schema() { + for field in schema.fields.iter() { + if field.metadata.contains_key( + crate::kernel::ColumnMetadataKey::GenerationExpression.as_ref(), + ) { + error!("The table contains `delta.generationExpression` settings on columns which mean this table cannot be currently written to by delta-rs"); + return Err(TransactionError::UnsupportedWriterFeatures(vec![ + WriterFeatures::GeneratedColumns, + ])); + } + } + } + } if let Some(features) = required_features { let mut diff = features.difference(&self.writer_features).peekable(); @@ -250,7 +267,8 @@ pub static INSTANCE: Lazy = Lazy::new(|| { mod tests { use super::super::test_utils::create_metadata_action; use super::*; - use crate::kernel::{Action, Add, Protocol, Remove}; + use crate::kernel::DataType as DeltaDataType; + use crate::kernel::{Action, Add, PrimitiveType, Protocol, Remove}; use crate::protocol::SaveMode; use crate::table::state::DeltaTableState; use crate::DeltaConfigKey; @@ -559,4 +577,65 @@ mod tests { assert!(checker_7.can_read_from(eager_7).is_ok()); assert!(checker_7.can_write_to(eager_7).is_ok()); } + + #[tokio::test] + async fn test_minwriter_v4_with_cdf() { + let checker_5 = ProtocolChecker::new(READER_V2.clone(), WRITER_V4.clone()); + let actions = vec![ + Action::Protocol( + Protocol::new(2, 4) + .with_writer_features(vec![crate::kernel::WriterFeatures::ChangeDataFeed]), + ), + create_metadata_action(None, Some(HashMap::new())), + ]; + let snapshot_5 = DeltaTableState::from_actions(actions).unwrap(); + let eager_5 = snapshot_5.snapshot(); + assert!(checker_5.can_write_to(eager_5).is_ok()); + } + + /// Technically we do not yet support generated columns, but it is okay to "accept" writing to + /// a column with minWriterVersion=4 and the generated columns feature as long as the + /// `delta.generationExpression` isn't actually defined the write is still allowed + #[tokio::test] + async fn test_minwriter_v4_with_generated_columns() { + let checker_5 = ProtocolChecker::new(READER_V2.clone(), WRITER_V4.clone()); + let actions = vec![ + Action::Protocol( + Protocol::new(2, 4) + .with_writer_features(vec![crate::kernel::WriterFeatures::GeneratedColumns]), + ), + create_metadata_action(None, Some(HashMap::new())), + ]; + let snapshot_5 = DeltaTableState::from_actions(actions).unwrap(); + let eager_5 = snapshot_5.snapshot(); + assert!(checker_5.can_write_to(eager_5).is_ok()); + } + + #[tokio::test] + async fn test_minwriter_v4_with_generated_columns_and_expressions() { + let checker_5 = ProtocolChecker::new(READER_V2.clone(), WRITER_V4.clone()); + let actions = vec![Action::Protocol(Protocol::new(2, 4).with_writer_features( + vec![crate::kernel::WriterFeatures::GeneratedColumns], + ))]; + + let table: crate::DeltaTable = crate::DeltaOps::new_in_memory() + .create() + .with_column( + "value", + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + Some(HashMap::from([( + "delta.generationExpression".into(), + "x IS TRUE".into(), + )])), + ) + .with_actions(actions) + .with_configuration_property(DeltaConfigKey::EnableChangeDataFeed, Some("true")) + .await + .expect("failed to make a version 4 table with EnableChangeDataFeed"); + let eager_5 = table + .snapshot() + .expect("Failed to get snapshot from test table"); + assert!(checker_5.can_write_to(eager_5).is_err()); + } }