Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

panic when GROUP BY column order doesn't match USING column order #4873

Closed
jonmmease opened this issue Jan 11, 2023 · 4 comments · Fixed by #4878
Closed

panic when GROUP BY column order doesn't match USING column order #4873

jonmmease opened this issue Jan 11, 2023 · 4 comments · Fixed by #4878
Labels
bug Something isn't working

Comments

@jonmmease
Copy link
Contributor

Describe the bug
VegaFusion implements the Vega impute transform with a query that uses a CROSS JOIN to build a table that contains all combinations of the unique values of certain input columns, then a LEFT OUTER JOIN to a subquery that performs a GROUP BY, grouping by these same columns. This LEFT OUTER JOIN includes a USING constraint with the same columns that are used in the GROUP BY subquery.

When the order of the columns in the GROUP BY doesn't match the order of the columns in the USING constraint, the query panics. If the columns are specified in the same order, the query completes successfully.

Example
For example, here is an input table with 3 columns.

+------+------+------+
| col1 | col2 | col3 |
+------+------+------+
| A    | 1    | 1    |
| A    | 1    | 1    |
| A    | 2    | 1    |
| A    | 2    | 1    |
| A    | 3    | 1    |
| A    | 3    | 1    |
| A    | 4    | 1    |
| A    | 4    | 1    |
| BB   | 5    | 1    |
| BB   | 5    | 1    |
| BB   | 6    | 1    |
| BB   | 6    | 1    |
+------+------+------+

Here is a working query:

SELECT "col1", "col2", coalesce("sum_col3", 0) as sum_col3
FROM (SELECT DISTINCT "col2" FROM "tbl") AS "subq1"
CROSS JOIN (SELECT DISTINCT "col1" FROM "tbl") AS "subq2"
LEFT OUTER JOIN (SELECT "col1", "col2", sum("col3") as "sum_col3" FROM "tbl" GROUP BY "col1", "col2") AS "subq3"
USING("col1", "col2")
+------+------+----------+
| col1 | col2 | sum_col3 |
+------+------+----------+
| BB   | 5    | 2        |
| BB   | 1    | 0        |
| BB   | 4    | 0        |
| BB   | 6    | 2        |
| BB   | 3    | 0        |
| A    | 3    | 2        |
| BB   | 2    | 0        |
| A    | 2    | 2        |
| A    | 1    | 2        |
| A    | 5    | 0        |
| A    | 6    | 0        |
| A    | 4    | 2        |
+------+------+----------+

The result contains one row for every combination of the unique values in col1 and col2. The sum_col3 columns contains the sum of the input col3 for that combination of col1 and col2, and zeros are filled in for combinations that weren't present in the input table.

Now modify the query by flipping the order of col1 and col2 in the USING constraint (From USING("col1", "col2") to USING("col2", "col1")). Now the query panics.

SELECT "col1", "col2", coalesce("sum_col3", 0) as sum_col3
FROM (SELECT DISTINCT "col2" FROM "tbl") AS "subq1"
CROSS JOIN (SELECT DISTINCT "col1" FROM "tbl") AS "subq2"
LEFT OUTER JOIN (SELECT "col1", "col2", sum("col3") as "sum_col3" FROM "tbl" GROUP BY "col1", "col2") AS "subq3"
USING("col2", "col1")
thread 'test_cross_join_bug2::count_distinct_error' panicked at 'called `Option::unwrap()` on a `None` value', /Users/jonmmease/.cargo/git/checkouts/arrow-datafusion-71ae82d9dec9a01c/556282a/datafusion/core/src/physical_plan/joins/hash_join.rs:923:17
stack backtrace:
...

If the order of the columns in the GROUP BY query is also flipped (from GROUP BY "col1", "col2" to GROUP BY "col2", "col1") then the query works again:

SELECT "col1", "col2", coalesce("sum_col3", 0) as sum_col3
FROM (SELECT DISTINCT "col2" FROM "tbl") AS "subq1"
CROSS JOIN (SELECT DISTINCT "col1" FROM "tbl") AS "subq2"
LEFT OUTER JOIN (SELECT "col1", "col2", sum("col3") as "sum_col3" FROM "tbl" GROUP BY "col2", "col1") AS "subq3"
USING("col2", "col1")
+------+------+----------+
| col1 | col2 | sum_col3 |
+------+------+----------+
| BB   | 5    | 2        |
| BB   | 1    | 0        |
| BB   | 4    | 0        |
| A    | 3    | 2        |
| BB   | 2    | 0        |
| BB   | 6    | 2        |
| BB   | 3    | 0        |
| A    | 2    | 2        |
| A    | 6    | 0        |
| A    | 4    | 2        |
| A    | 1    | 2        |
| A    | 5    | 0        |
+------+------+----------+

To Reproduce
Here is a Rust test that implements the two queries above:

#[cfg(test)]
mod test_cross_join_bug2 {
    use std::sync::Arc;
    use datafusion::arrow::array::{ArrayRef, StringArray, UInt64Array};
    use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
    use datafusion::arrow::record_batch::RecordBatch;
    use datafusion::arrow::util::pretty::pretty_format_batches;
    use datafusion::datasource::MemTable;
    use datafusion::prelude::SessionContext;

    #[tokio::test]
    async fn count_distinct_error() {
        let col1 = Arc::new(StringArray::from(vec![
            "A", "A", "A", "A",
            "A", "A", "A", "A",
            "BB", "BB", "BB", "BB",
        ])) as ArrayRef;

        let col2 = Arc::new(UInt64Array::from(vec![
            1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6
        ])) as ArrayRef;

        let col3 = Arc::new(UInt64Array::from(vec![
            1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
        ])) as ArrayRef;

        let schema = Arc::new(Schema::new(vec![
            Field::new("col1", DataType::Utf8, true),
            Field::new("col2", DataType::UInt64, true),
            Field::new("col3", DataType::UInt64, true),
        ])) as SchemaRef;

        let batch =
            RecordBatch::try_new(schema.clone(), vec![
                col1, col2, col3,
            ]).unwrap();
        let mem_table = MemTable::try_new(schema, vec![vec![batch]]).unwrap();

        // Create context and register table
        let ctx = SessionContext::new();
        ctx.register_table("tbl", Arc::new(mem_table)).unwrap();

        // Pretty print the input table
        let res = ctx
            .sql("SELECT * from tbl")
            .await
            .unwrap()
            .collect()
            .await
            .unwrap();
        let formatted = pretty_format_batches(res.as_slice()).unwrap();
        println!("{}", formatted);

        // Perform query where GROUP BY and USING both specify "col1", "col2" (works)
        let sql1 = r#"
SELECT "col1", "col2", coalesce("sum_col3", 0) as sum_col3
FROM (SELECT DISTINCT "col2" FROM "tbl") AS "subq1"
CROSS JOIN (SELECT DISTINCT "col1" FROM "tbl") AS "subq2"
LEFT OUTER JOIN (SELECT "col1", "col2", sum("col3") as "sum_col3" FROM "tbl" GROUP BY "col1", "col2") AS "subq3"
USING("col1", "col2")
    "#;

        let res = ctx.sql(sql1).await.unwrap().collect().await.unwrap();
        let formatted = pretty_format_batches(res.as_slice()).unwrap();
        println!("{}", formatted);

        // Perform query where GROUP BY specifies "col1", "col2" and USING specifies "col2", "col1" (panic)
        let sql2 = r#"
SELECT "col1", "col2", coalesce("sum_col3", 0) as sum_col3
FROM (SELECT DISTINCT "col2" FROM "tbl") AS "subq1"
CROSS JOIN (SELECT DISTINCT "col1" FROM "tbl") AS "subq2"
LEFT OUTER JOIN (SELECT "col1", "col2", sum("col3") as "sum_col3" FROM "tbl" GROUP BY "col1", "col2") AS "subq3"
USING("col2", "col1")
    "#;

        let res = ctx.sql(sql2).await.unwrap().collect().await.unwrap();
        let formatted = pretty_format_batches(res.as_slice()).unwrap();
        println!("{}", formatted);
    }
}

Output

+------+------+------+
| col1 | col2 | col3 |
+------+------+------+
| A    | 1    | 1    |
| A    | 1    | 1    |
| A    | 2    | 1    |
| A    | 2    | 1    |
| A    | 3    | 1    |
| A    | 3    | 1    |
| A    | 4    | 1    |
| A    | 4    | 1    |
| BB   | 5    | 1    |
| BB   | 5    | 1    |
| BB   | 6    | 1    |
| BB   | 6    | 1    |
+------+------+------+
+------+------+----------+
| col1 | col2 | sum_col3 |
+------+------+----------+
| A    | 3    | 2        |
| BB   | 2    | 0        |
| BB   | 6    | 2        |
| BB   | 3    | 0        |
| BB   | 5    | 2        |
| BB   | 1    | 0        |
| BB   | 4    | 0        |
| A    | 4    | 2        |
| A    | 2    | 2        |
| A    | 6    | 0        |
| A    | 1    | 2        |
| A    | 5    | 0        |
+------+------+----------+

thread 'test_cross_join_bug2::count_distinct_error' panicked at 'called `Option::unwrap()` on a `None` value', /Users/jonmmease/.cargo/git/checkouts/arrow-datafusion-71ae82d9dec9a01c/556282a/datafusion/core/src/physical_plan/joins/hash_join.rs:923:17

Expected behavior
These two queries should both complete successfully, regardless of whether the columns are specified in the same order in the GROUP BY and USING constraints.

@jonmmease
Copy link
Contributor Author

jonmmease commented Jan 11, 2023

Notes:

If I comment out the EnforceDistribution physical optimizer rule then the error goes away.

The physical plans are very different. Here's an online diff, where the Left is the working query and Right is the query with USING column order flipped: https://www.diffchecker.com/3jfwJyrn/

The physical plan for the broken query seems to have incorrectly flipped col1/col2 datatypes in the schema in a couple of places:

Screen Shot 2023-01-11 at 9 44 48 AM

This probably explains the unwrap from the panic message at line 923 of hash_join.rs:

https://github.com/apache/arrow-datafusion/blob/39d98f8f4528f408c3cc8a03ee1fe7ecd990a35f/datafusion/core/src/physical_plan/joins/hash_join.rs#L922-L924

This equal_rows_elem! macro expands to:

{
    let left_array = l.as_any().downcast_ref::<UInt64Array>().unwrap();
    let right_array = r.as_any().downcast_ref::<UInt64Array>().unwrap();

    match (left_array.is_null(left), right_array.is_null(right)) {
        (false, false) => left_array.value(left) == right_array.value(right),
        (true, true) => null_equals_null,
        _ => false,
    }
}

Which performs array type downcasts, and one of these probably fails because the schema was incorrect.

@ozankabak
Copy link
Contributor

Thank you for the detailed explanation. This kind of thing should have a test, I will keep an eye on the fix PR to make sure we have a test to avoid regressions in the future.

@jonmmease
Copy link
Contributor Author

Thanks @ozankabak,
I haven't been able to work out the root cause yet, so if anyone has any ideas I would appreciate it!

@jonmmease
Copy link
Contributor Author

I think I have it sorted out in #4878

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
2 participants