Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datafusion can't seem to cast evolving structs #14757

Open
TheBuilderJR opened this issue Feb 19, 2025 · 9 comments
Open

Datafusion can't seem to cast evolving structs #14757

TheBuilderJR opened this issue Feb 19, 2025 · 9 comments
Labels
bug Something isn't working

Comments

@TheBuilderJR
Copy link
Contributor

TheBuilderJR commented Feb 19, 2025

Describe the bug

I'd expect as I add fields to structs, I should be able to cast one into another. You can see in the repro below this doesn't seem to be allowed:

To Reproduce

use std::fs;
use std::sync::Arc;
use datafusion::prelude::*;
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::array::{Array, StringArray, StructArray, TimestampMillisecondArray, Float64Array};
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::dataframe::DataFrameWriteOptions;

#[tokio::test]
async fn test_datafusion_schema_evolution_with_compaction() -> Result<(), Box<dyn std::error::Error>> {
    let ctx = SessionContext::new();

    let schema1 = Arc::new(Schema::new(vec![
        Field::new("component", DataType::Utf8, true),
        Field::new("message", DataType::Utf8, true),
        Field::new("stack", DataType::Utf8, true),
        Field::new("timestamp", DataType::Utf8, true),
        Field::new(
            "timestamp_utc",
            DataType::Timestamp(TimeUnit::Millisecond, None),
            true,
        ),
        Field::new(
            "additionalInfo",
            DataType::Struct(vec![
                Field::new("location", DataType::Utf8, true),
                Field::new(
                    "timestamp_utc",
                    DataType::Timestamp(TimeUnit::Millisecond, None),
                    true,
                ),
            ].into()),
            true,
        ),
    ]));
    
    let batch1 = RecordBatch::try_new(
        schema1.clone(),
        vec![
            Arc::new(StringArray::from(vec![Some("component1")])),
            Arc::new(StringArray::from(vec![Some("message1")])),
            Arc::new(StringArray::from(vec![Some("stack_trace")])),
            Arc::new(StringArray::from(vec![Some("2025-02-18T00:00:00Z")])),
            Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
            Arc::new(StructArray::from(vec![
                (
                    Arc::new(Field::new("location", DataType::Utf8, true)),
                    Arc::new(StringArray::from(vec![Some("USA")])) as Arc<dyn Array>,
                ),
                (
                    Arc::new(Field::new(
                        "timestamp_utc",
                        DataType::Timestamp(TimeUnit::Millisecond, None),
                        true,
                    )),
                    Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
                ),
            ])),
        ],
    )?;

    let path1 = "test_data1.parquet";
    let _ = fs::remove_file(path1);
    
    let df1 = ctx.read_batch(batch1)?;
    df1.write_parquet(
        path1,
        DataFrameWriteOptions::default()
            .with_single_file_output(true)
            .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
        None
    ).await?;

    let schema2 = Arc::new(Schema::new(vec![
        Field::new("component", DataType::Utf8, true),
        Field::new("message", DataType::Utf8, true),
        Field::new("stack", DataType::Utf8, true),
        Field::new("timestamp", DataType::Utf8, true),
        Field::new(
            "timestamp_utc",
            DataType::Timestamp(TimeUnit::Millisecond, None),
            true,
        ),
        Field::new(
            "additionalInfo",
            DataType::Struct(vec![
                Field::new("location", DataType::Utf8, true),
                Field::new(
                    "timestamp_utc",
                    DataType::Timestamp(TimeUnit::Millisecond, None),
                    true,
                ),
                Field::new(
                    "reason",
                    DataType::Struct(vec![
                        Field::new("_level", DataType::Float64, true),
                        Field::new(
                            "details",
                            DataType::Struct(vec![
                                Field::new("rurl", DataType::Utf8, true),
                                Field::new("s", DataType::Float64, true),
                                Field::new("t", DataType::Utf8, true),
                            ].into()),
                            true,
                        ),
                    ].into()),
                    true,
                ),
            ].into()),
            true,
        ),
    ]));

    let batch2 = RecordBatch::try_new(
        schema2.clone(),
        vec![
            Arc::new(StringArray::from(vec![Some("component1")])),
            Arc::new(StringArray::from(vec![Some("message1")])),
            Arc::new(StringArray::from(vec![Some("stack_trace")])),
            Arc::new(StringArray::from(vec![Some("2025-02-18T00:00:00Z")])),
            Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
            Arc::new(StructArray::from(vec![
                (
                    Arc::new(Field::new("location", DataType::Utf8, true)),
                    Arc::new(StringArray::from(vec![Some("USA")])) as Arc<dyn Array>,
                ),
                (
                    Arc::new(Field::new(
                        "timestamp_utc",
                        DataType::Timestamp(TimeUnit::Millisecond, None),
                        true,
                    )),
                    Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
                ),
                (
                    Arc::new(Field::new(
                        "reason",
                        DataType::Struct(vec![
                            Field::new("_level", DataType::Float64, true),
                            Field::new(
                                "details",
                                DataType::Struct(vec![
                                    Field::new("rurl", DataType::Utf8, true),
                                    Field::new("s", DataType::Float64, true),
                                    Field::new("t", DataType::Utf8, true),
                                ].into()),
                                true,
                            ),
                        ].into()),
                        true,
                    )),
                    Arc::new(StructArray::from(vec![
                        (
                            Arc::new(Field::new("_level", DataType::Float64, true)),
                            Arc::new(Float64Array::from(vec![Some(1.5)])) as Arc<dyn Array>,
                        ),
                        (
                            Arc::new(Field::new(
                                "details",
                                DataType::Struct(vec![
                                    Field::new("rurl", DataType::Utf8, true),
                                    Field::new("s", DataType::Float64, true),
                                    Field::new("t", DataType::Utf8, true),
                                ].into()),
                                true,
                            )),
                            Arc::new(StructArray::from(vec![
                                (
                                    Arc::new(Field::new("rurl", DataType::Utf8, true)),
                                    Arc::new(StringArray::from(vec![Some("https://example.com")])) as Arc<dyn Array>,
                                ),
                                (
                                    Arc::new(Field::new("s", DataType::Float64, true)),
                                    Arc::new(Float64Array::from(vec![Some(3.14)])) as Arc<dyn Array>,
                                ),
                                (
                                    Arc::new(Field::new("t", DataType::Utf8, true)),
                                    Arc::new(StringArray::from(vec![Some("data")])) as Arc<dyn Array>,
                                ),
                            ])),
                        ),
                    ])),
                ),
            ])),
        ],
    )?;

    let path2 = "test_data2.parquet";
    let _ = fs::remove_file(path2);
    
    let df2 = ctx.read_batch(batch2)?;
    df2.write_parquet(
        path2,
        DataFrameWriteOptions::default()
            .with_single_file_output(true)
            .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
        None
    ).await?;

    let paths_str = vec![path1.to_string(), path2.to_string()];
    let config = ListingTableConfig::new_with_multi_paths(
        paths_str
            .into_iter()
            .map(|p| ListingTableUrl::parse(&p))
            .collect::<Result<Vec<_>, _>>()?
    )
        .with_schema(schema2.as_ref().clone().into())
        .infer(&ctx.state()).await?;

    let config = ListingTableConfig {
        options: Some(ListingOptions {
            file_sort_order: vec![vec![
                col("timestamp_utc").sort(true, true),
            ]],
            ..config.options.unwrap_or_else(|| ListingOptions::new(Arc::new(ParquetFormat::default())))
        }),
        ..config
    };

    let listing_table = ListingTable::try_new(config)?;
    ctx.register_table("events", Arc::new(listing_table))?;

    let df = ctx.sql("SELECT * FROM events ORDER BY timestamp_utc").await?;
    let results = df.clone().collect().await?;

    assert_eq!(results[0].num_rows(), 2);

    let compacted_path = "test_data_compacted.parquet";
    let _ = fs::remove_file(compacted_path);

    df.write_parquet(
        compacted_path,
        DataFrameWriteOptions::default()
            .with_single_file_output(true)
            .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
        None
    ).await?;

    let new_ctx = SessionContext::new();
    let config = ListingTableConfig::new_with_multi_paths(vec![ListingTableUrl::parse(compacted_path)?])
        .with_schema(schema2.as_ref().clone().into())
        .infer(&new_ctx.state()).await?;
    
    let listing_table = ListingTable::try_new(config)?;
    new_ctx.register_table("events", Arc::new(listing_table))?;

    let df = new_ctx.sql("SELECT * FROM events ORDER BY timestamp_utc").await?;
    let compacted_results = df.collect().await?;
    
    assert_eq!(compacted_results[0].num_rows(), 2);
    assert_eq!(results, compacted_results);

    let _ = fs::remove_file(path1);
    let _ = fs::remove_file(path2);
    let _ = fs::remove_file(compacted_path);

    Ok(())
}

produces

Error: Plan("Cannot cast file schema field additionalInfo of type Struct([Field { name: \"location\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"timestamp_utc\", data_type: Timestamp(Millisecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"reason\", data_type: Struct([Field { name: \"_level\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"details\", data_type: Struct([Field { name: \"rurl\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"s\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"t\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) to table schema field of type Struct([Field { name: \"location\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"timestamp_utc\", data_type: Timestamp(Millisecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }])

Expected behavior

i expected that test to pass

Additional context

No response

@TheBuilderJR TheBuilderJR added the bug Something isn't working label Feb 19, 2025
@TheBuilderJR
Copy link
Contributor Author

TheBuilderJR commented Feb 19, 2025

cc @alamb @zhuqi-lucas many of my users can't query their data because of this evolution. any chance you can take a look to see if there's any workaround I can do for now?

@alamb
Copy link
Contributor

alamb commented Feb 19, 2025

cc @alamb @zhuqi-lucas many of my users can't query their data because of this evolution. any chance you can take a look to see if there's any workaround I can do for now?

I don't know of a workaround here.

This seems like a feature we would have to implement (likely in arrow-rs 's cast kernel first)

We had some discussion on the semantics of struct casting recently

@TheBuilderJR
Copy link
Contributor Author

@alamb perhaps a subtask is making Schema::try_merge consistent with datafusion's ability to query these merged schemas. Schema::try_merge currently happily merges these structs, and then I as an end user am in this danger zone where data has been mutated with an assumption it can be queried when it can't.

@zhuqi-lucas
Copy link
Contributor

Can't find a workaround for this, and i think the Schema::try_merge passed before this error.

So when we map_schema at the end, we should still check the cast error why it not passed. I am not familiar with the checking logic. Need help from other folks.

@TheBuilderJR
Copy link
Contributor Author

@alamb how do y'all handle this at influx? This one comes as quite a shocker to me. Does no one else using datafusion support struct evolution?

@alamb
Copy link
Contributor

alamb commented Feb 22, 2025

@alamb how do y'all handle this at influx? This one comes as quite a shocker to me. Does no one else using datafusion support struct evolution?

InfluxData doesnt have a struct datatype so it doesn't come up

Basically I think extending the schema merging and casting logic to handle struct evolution sounds like a good idea to me. Eventually the code probably belongs in arrow-rs but we could try to implement it first in DataFusion and then port it upstream

Can someone file / find a ticket upstream in arrow-rs to get the conversation started there?

@TheBuilderJR
Copy link
Contributor Author

@alamb given that the arrow folks don't seem super motivated to fix this in a timely manner, can we do a fix on the datafusion side? Maybe the fix is we can try to do an arrow cast, but if it fails we fallback to a datafusion cast specialized to this struct use case?

@alamb
Copy link
Contributor

alamb commented Feb 24, 2025

@alamb given that the arrow folks don't seem super motivated to fix this in a timely manner, can we do a fix on the datafusion side?

Yes, of course!

Maybe the fix is we can try to do an arrow cast, but if it fails we fallback to a datafusion cast specialized to this struct use case?

Yes I think that would be a good idea! Can you perhaps work on a PR?

I think it will be related to #14396 as well so maybe it would be good to give that a look, help @Lordworms with it

@Lordworms
Copy link
Contributor

I forgot to rebase the main branch, but I can refactor it to use arrow-cast + specialized case

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants