Skip to content

Commit

Permalink
fix(delta-io#1493): Avoid writing statistics for binary columns to fi…
Browse files Browse the repository at this point in the history
…x JSON error
  • Loading branch information
ChewingGlass committed Jun 28, 2023
1 parent e5dd8e2 commit 6cdcb67
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 4 deletions.
33 changes: 29 additions & 4 deletions rust/src/action/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,32 @@ pub async fn cleanup_expired_logs_for(
}
}

fn parquet_bytes_from_state(
state: &DeltaTableState,
) -> Result<(CheckPoint, bytes::Bytes), ProtocolError> {
fn filter_binary(schema: &Schema) -> Schema {
Schema::new(
schema
.get_fields()
.into_iter()
.flat_map(|f| match f.get_type() {
SchemaDataType::primitive(p) => {
if p != "binary" {
Some(f.clone())
} else {
None
}
}
SchemaDataType::r#struct(s) => Some(SchemaField::new(
f.get_name().to_string(),
SchemaDataType::r#struct(filter_binary(&Schema::new(s.get_fields().clone()))),
f.is_nullable(),
f.get_metadata().clone(),
)),
_ => Some(f.clone()),
})
.collect::<Vec<_>>(),
)
}

fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, ProtocolError> {
let current_metadata = state.current_metadata().ok_or(ProtocolError::NoMetaData)?;

let partition_col_data_types = current_metadata.get_partition_col_data_types();
Expand Down Expand Up @@ -353,9 +376,11 @@ fn parquet_bytes_from_state(
checkpoint_add_from_state(f, partition_col_data_types.as_slice(), &stats_conversions)
}));

let filterd_schema = filter_binary(&current_metadata.schema);

// Create the arrow schema that represents the Checkpoint parquet file.
let arrow_schema = delta_log_schema_for_table(
<ArrowSchema as TryFrom<&Schema>>::try_from(&current_metadata.schema)?,
<ArrowSchema as TryFrom<&Schema>>::try_from(&filterd_schema)?,
current_metadata.partition_columns.as_slice(),
use_extended_remove_schema,
);
Expand Down
67 changes: 67 additions & 0 deletions rust/tests/checkpoint_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,77 @@ mod fs_common;

#[cfg(all(feature = "arrow", feature = "parquet"))]
mod simple_checkpoint {
use arrow::datatypes::Schema as ArrowSchema;
use arrow_array::{BinaryArray, RecordBatch};
use arrow_schema::{DataType, Field};
use deltalake::writer::{DeltaWriter, RecordBatchWriter};
use deltalake::*;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use std::error::Error;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;

struct Context {
pub table: DeltaTable,
}

async fn setup_test() -> Result<Context, Box<dyn Error>> {
let columns = vec![SchemaField::new(
"x".to_owned(),
SchemaDataType::primitive("binary".to_owned()),
false,
HashMap::new(),
)];

let tmp_dir = tempdir::TempDir::new("opt_table").unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let dt = DeltaOps::try_from_uri(table_uri)
.await?
.create()
.with_columns(columns)
.await?;

Ok(Context { table: dt })
}

fn get_batch(items: Vec<&[u8]>) -> Result<RecordBatch, Box<dyn Error>> {
let x_array = BinaryArray::from(items);

Ok(RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![Field::new(
"x",
DataType::Binary,
false,
)])),
vec![Arc::new(x_array)],
)?)
}

async fn write(
writer: &mut RecordBatchWriter,
table: &mut DeltaTable,
batch: RecordBatch,
) -> Result<(), DeltaTableError> {
writer.write(batch).await?;
writer.flush_and_commit(table).await?;
Ok(())
}

#[tokio::test]
async fn test_checkpoint_write_binary_stats() -> Result<(), Box<dyn Error>> {
let context = setup_test().await?;
let mut dt = context.table;
let mut writer = RecordBatchWriter::for_table(&dt)?;

write(&mut writer, &mut dt, get_batch(vec![&[1, 2]])?).await?;

// Just checking that this doesn't fail. https://github.com/delta-io/delta-rs/issues/1493
checkpoints::create_checkpoint(&dt).await?;

Ok(())
}

#[tokio::test]
async fn simple_checkpoint_test() {
Expand Down

0 comments on commit 6cdcb67

Please sign in to comment.