Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datafusion listing table evolution is dependent on file order #14755

Open
TheBuilderJR opened this issue Feb 18, 2025 · 3 comments
Open

Datafusion listing table evolution is dependent on file order #14755

TheBuilderJR opened this issue Feb 18, 2025 · 3 comments
Labels
bug Something isn't working

Comments

@TheBuilderJR
Copy link
Contributor

Describe the bug

It should be agnostic, but basically requires files to be ordered in evolving order. For the following if you change from 4,1,2,3 to 1,2,3,4 it works, but not vice versa

To Reproduce

use std::fs;
use std::sync::Arc;
use datafusion::prelude::*;
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::array::{Array, StringArray, StructArray, TimestampMillisecondArray};
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::dataframe::DataFrameWriteOptions;

#[tokio::test]
async fn test_datafusion_schema_evolution_with_compaction() -> Result<(), Box<dyn std::error::Error>> {
    let ctx = SessionContext::new();

    let schema1 = Arc::new(Schema::new(vec![
        Field::new("event", DataType::Utf8, false),
        Field::new("timestamp_utc", DataType::Timestamp(TimeUnit::Millisecond, None), false),
    ]));
    
    let batch1 = RecordBatch::try_new(
        schema1.clone(),
        vec![
            Arc::new(StringArray::from(vec!["event1"])),
            Arc::new(TimestampMillisecondArray::from(vec![1640995200000]))
        ]
    )?;

    let path1 = "test_data1.parquet";
    let _ = fs::remove_file(path1);
    
    let df1 = ctx.read_batch(batch1)?;
    df1.write_parquet(
        path1,
        DataFrameWriteOptions::default()
            .with_single_file_output(true)
            .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
        None
    ).await?;

    let schema2 = Arc::new(Schema::new(vec![
        Field::new("event", DataType::Utf8, false),
        Field::new("timestamp_utc", DataType::Timestamp(TimeUnit::Millisecond, None), false),
        Field::new("data", DataType::Struct(vec![
            Field::new("some_data", DataType::Utf8, false)
        ].into()), false),
    ]));

    let batch2 = RecordBatch::try_new(
        schema2.clone(),
        vec![
            Arc::new(StringArray::from(vec!["event2"])),
            Arc::new(TimestampMillisecondArray::from(vec![1641081600000])),
            Arc::new(StructArray::from(vec![(
                Arc::new(Field::new("some_data", DataType::Utf8, false)),
                Arc::new(StringArray::from(vec!["additional_data"])) as Arc<dyn Array>
            )]))
        ]
    )?;

    let path2 = "test_data2.parquet";
    let _ = fs::remove_file(path2);
    
    let df2 = ctx.read_batch(batch2)?;
    df2.write_parquet(
        path2,
        DataFrameWriteOptions::default()
            .with_single_file_output(true)
            .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
        None
    ).await?;

    let schema3 = Arc::new(Schema::new(vec![
        Field::new("event", DataType::Utf8, false),
        Field::new("timestamp_utc", DataType::Timestamp(TimeUnit::Millisecond, None), false),
        Field::new("data", DataType::Struct(vec![
            Field::new("even_more_nested_data", DataType::Struct(vec![
                Field::new("some_data", DataType::Utf8, false)
            ].into()), false)
        ].into()), false),
    ]));

    let batch3 = RecordBatch::try_new(
        schema3.clone(),
        vec![
            Arc::new(StringArray::from(vec!["event3"])),
            Arc::new(TimestampMillisecondArray::from(vec![1641168000000])),
            Arc::new(StructArray::from(vec![(
                Arc::new(Field::new("even_more_nested_data", DataType::Struct(vec![
                    Field::new("some_data", DataType::Utf8, false)
                ].into()), false)),
                Arc::new(StructArray::from(vec![(
                    Arc::new(Field::new("some_data", DataType::Utf8, false)),
                    Arc::new(StringArray::from(vec!["deeply_nested_value"])) as Arc<dyn Array>
                )])) as Arc<dyn Array>
            )]))
        ]
    )?;

    let path3 = "test_data3.parquet";
    let _ = fs::remove_file(path3);
    
    let df3 = ctx.read_batch(batch3)?;
    df3.write_parquet(
        path3,
        DataFrameWriteOptions::default()
            .with_single_file_output(true)
            .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
        None
    ).await?;

    let schema4 = Arc::new(Schema::new(vec![
        Field::new("event", DataType::Utf8, false),
        Field::new("timestamp_utc", DataType::Timestamp(TimeUnit::Millisecond, None), false),
        Field::new("data", DataType::Struct(vec![
            Field::new("even_more_nested_data", DataType::Struct(vec![
                Field::new("some_data", DataType::Struct(vec![
                    Field::new("deepest_data", DataType::Utf8, false)
                ].into()), false)
            ].into()), false)
        ].into()), false),
    ]));

    let batch4 = RecordBatch::try_new(
        schema4.clone(),
        vec![
            Arc::new(StringArray::from(vec!["event4"])),
            Arc::new(TimestampMillisecondArray::from(vec![1641254400000])),
            Arc::new(StructArray::from(vec![(
                Arc::new(Field::new("even_more_nested_data", DataType::Struct(vec![
                    Field::new("some_data", DataType::Struct(vec![
                        Field::new("deepest_data", DataType::Utf8, false)
                    ].into()), false)
                ].into()), false)),
                Arc::new(StructArray::from(vec![(
                    Arc::new(Field::new("some_data", DataType::Struct(vec![
                        Field::new("deepest_data", DataType::Utf8, false)
                    ].into()), false)),
                    Arc::new(StructArray::from(vec![(
                        Arc::new(Field::new("deepest_data", DataType::Utf8, false)),
                        Arc::new(StringArray::from(vec!["super_deeply_nested_value"])) as Arc<dyn Array>
                    )])) as Arc<dyn Array>
                )])) as Arc<dyn Array>
            )]))
        ]
    )?;

    let path4 = "test_data4.parquet";
    let _ = fs::remove_file(path4);
    
    let df4 = ctx.read_batch(batch4)?;
    df4.write_parquet(
        path4,
        DataFrameWriteOptions::default()
            .with_single_file_output(true)
            .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
        None
    ).await?;

    let paths_str = vec![path4.to_string(), path2.to_string(), path3.to_string(), path1.to_string()];
    let config = ListingTableConfig::new_with_multi_paths(
        paths_str
            .into_iter()
            .map(|p| ListingTableUrl::parse(&p))
            .collect::<Result<Vec<_>, _>>()?
    )
        .with_schema(schema4.as_ref().clone().into())
        .infer(&ctx.state()).await?;

    let config = ListingTableConfig {
        options: Some(ListingOptions {
            file_sort_order: vec![vec![
                col("timestamp_utc").sort(true, true),
            ]],
            ..config.options.unwrap_or_else(|| ListingOptions::new(Arc::new(ParquetFormat::default())))
        }),
        ..config
    };

    let listing_table = ListingTable::try_new(config)?;
    ctx.register_table("events", Arc::new(listing_table))?;

    let df = ctx.sql("SELECT * FROM events ORDER BY event").await?;
    let results = df.clone().collect().await?;

    assert_eq!(results[0].num_rows(), 4);

    let compacted_path = "test_data_compacted.parquet";
    let _ = fs::remove_file(compacted_path);

    df.write_parquet(
        compacted_path,
        DataFrameWriteOptions::default()
            .with_single_file_output(true)
            .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
        None
    ).await?;

    let new_ctx = SessionContext::new();
    let config = ListingTableConfig::new_with_multi_paths(vec![ListingTableUrl::parse(compacted_path)?])
        .with_schema(schema4.as_ref().clone().into())
        .infer(&new_ctx.state()).await?;
    
    let listing_table = ListingTable::try_new(config)?;
    new_ctx.register_table("events", Arc::new(listing_table))?;

    let df = new_ctx.sql("SELECT * FROM events ORDER BY event").await?;
    let compacted_results = df.collect().await?;
    
    assert_eq!(compacted_results[0].num_rows(), 4);
    assert_eq!(results, compacted_results);

    let _ = fs::remove_file(path1);
    let _ = fs::remove_file(path2);
    let _ = fs::remove_file(path3);
    let _ = fs::remove_file(path4);
    let _ = fs::remove_file(compacted_path);

    Ok(())
}

produces

rror: Plan("Cannot cast file schema field data of type Struct([Field { name: \"some_data\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]) to table schema field of type Struct([Field { name: \"even_more_nested_data\", data_type: Struct([Field { name: \"some_data\", data_type: Struct([Field { name: \"deepest_data\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }])

Expected behavior

it should work irrespective of file order

Additional context

No response

@alamb
Copy link
Contributor

alamb commented Feb 19, 2025

Improving schema evolution seems like a good thing to work on in my opinion

@zhuqi-lucas
Copy link
Contributor

I will try to take a look this issue.

@TheBuilderJR
Copy link
Contributor Author

@zhuqi-lucas #14757 is much more urgent. If you could take a look at that first, that'd be great.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants