From 070c49b366debbde154cbde34fc91adfa1e30ca0 Mon Sep 17 00:00:00 2001 From: Chewing Glass Date: Tue, 27 Jun 2023 21:35:45 -0500 Subject: [PATCH] fix(#1493): Avoid writing statistics for binary columns to fix JSON error --- rust/src/action/checkpoints.rs | 36 ++++++++++++++++-- rust/tests/checkpoint_writer.rs | 67 +++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 4 deletions(-) diff --git a/rust/src/action/checkpoints.rs b/rust/src/action/checkpoints.rs index 18054fa1ba..5d21b94238 100644 --- a/rust/src/action/checkpoints.rs +++ b/rust/src/action/checkpoints.rs @@ -281,9 +281,35 @@ pub async fn cleanup_expired_logs_for( } } -fn parquet_bytes_from_state( - state: &DeltaTableState, -) -> Result<(CheckPoint, bytes::Bytes), ProtocolError> { +// Filter binary from the schema so that it isn't serialized into JSON, +// as arrow currently does not support this. +// https://github.com/delta-io/delta-rs/issues/1493 +fn filter_binary(schema: &Schema) -> Schema { + Schema::new( + schema + .get_fields() + .into_iter() + .flat_map(|f| match f.get_type() { + SchemaDataType::primitive(p) => { + if p != "binary" { + Some(f.clone()) + } else { + None + } + } + SchemaDataType::r#struct(s) => Some(SchemaField::new( + f.get_name().to_string(), + SchemaDataType::r#struct(filter_binary(&Schema::new(s.get_fields().clone()))), + f.is_nullable(), + f.get_metadata().clone(), + )), + _ => Some(f.clone()), + }) + .collect::>(), + ) +} + +fn parquet_bytes_from_state(state: &DeltaTableState) -> Result { let current_metadata = state.current_metadata().ok_or(ProtocolError::NoMetaData)?; let partition_col_data_types = current_metadata.get_partition_col_data_types(); @@ -353,9 +379,11 @@ fn parquet_bytes_from_state( checkpoint_add_from_state(f, partition_col_data_types.as_slice(), &stats_conversions) })); + let filterd_schema = filter_binary(¤t_metadata.schema); + // Create the arrow schema that represents the Checkpoint parquet file. let arrow_schema = delta_log_schema_for_table( - >::try_from(¤t_metadata.schema)?, + >::try_from(&filterd_schema)?, current_metadata.partition_columns.as_slice(), use_extended_remove_schema, ); diff --git a/rust/tests/checkpoint_writer.rs b/rust/tests/checkpoint_writer.rs index 80067814dd..bc92615076 100644 --- a/rust/tests/checkpoint_writer.rs +++ b/rust/tests/checkpoint_writer.rs @@ -8,10 +8,77 @@ mod fs_common; #[cfg(all(feature = "arrow", feature = "parquet"))] mod simple_checkpoint { + use arrow::datatypes::Schema as ArrowSchema; + use arrow_array::{BinaryArray, RecordBatch}; + use arrow_schema::{DataType, Field}; + use deltalake::writer::{DeltaWriter, RecordBatchWriter}; use deltalake::*; use pretty_assertions::assert_eq; + use std::collections::HashMap; + use std::error::Error; use std::fs; use std::path::{Path, PathBuf}; + use std::sync::Arc; + + struct Context { + pub table: DeltaTable, + } + + async fn setup_test() -> Result> { + let columns = vec![SchemaField::new( + "x".to_owned(), + SchemaDataType::primitive("binary".to_owned()), + false, + HashMap::new(), + )]; + + let tmp_dir = tempdir::TempDir::new("opt_table").unwrap(); + let table_uri = tmp_dir.path().to_str().to_owned().unwrap(); + let dt = DeltaOps::try_from_uri(table_uri) + .await? + .create() + .with_columns(columns) + .await?; + + Ok(Context { table: dt }) + } + + fn get_batch(items: Vec<&[u8]>) -> Result> { + let x_array = BinaryArray::from(items); + + Ok(RecordBatch::try_new( + Arc::new(ArrowSchema::new(vec![Field::new( + "x", + DataType::Binary, + false, + )])), + vec![Arc::new(x_array)], + )?) + } + + async fn write( + writer: &mut RecordBatchWriter, + table: &mut DeltaTable, + batch: RecordBatch, + ) -> Result<(), DeltaTableError> { + writer.write(batch).await?; + writer.flush_and_commit(table).await?; + Ok(()) + } + + #[tokio::test] + async fn test_checkpoint_write_binary_stats() -> Result<(), Box> { + let context = setup_test().await?; + let mut dt = context.table; + let mut writer = RecordBatchWriter::for_table(&dt)?; + + write(&mut writer, &mut dt, get_batch(vec![&[1, 2]])?).await?; + + // Just checking that this doesn't fail. https://github.com/delta-io/delta-rs/issues/1493 + checkpoints::create_checkpoint(&dt).await?; + + Ok(()) + } #[tokio::test] async fn simple_checkpoint_test() {