Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the schema mismatch between logical and physical for aggregate function, add AggregateUDFImpl::is_null #11989

Merged
merged 23 commits into from
Aug 21, 2024

Conversation

jayzhan211
Copy link
Contributor

@jayzhan211 jayzhan211 commented Aug 14, 2024

Which issue does this PR close?

Closes #.
Part of #11782 , it would be nice to cleanup schema before fighting with physical name

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions physical-expr Physical Expressions optimizer Optimizer rules core Core DataFusion crate functions labels Aug 14, 2024
let physical_input_schema_from_logical: Arc<Schema> =
logical_input_schema.as_ref().clone().into();

debug_assert_eq!(physical_input_schema_from_logical, physical_input_schema, "Physical input schema should be the same as the one converted from logical input schema. Please file an issue or send the PR");
Copy link
Contributor Author

@jayzhan211 jayzhan211 Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main goal of the change is to ensure they are the same. And, we pass physical_input_schema through the function that require input's schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Did you consider making this function return an internal_error rather than debug_assert ?

If we are concerned about breaking existing tests, we could add a config setting like datafusion.optimizer.skip_failed_rules to let users bypass the check

Copy link
Contributor Author

@jayzhan211 jayzhan211 Aug 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The objective here is to ensure that the logical schema from ExprSchemable and the physical schema from ExecutionPlan.schema() are equivalent. if they are not, it indicates a potential schema mismatch issue. This is also why you can see the code change in this PR are mostly fixing schema related things and they are all required thus I don't think we should let user bypass the check 🤔

If we encounter inconsistent schemas, it raises an important question: Which schema should we use?

Did you consider making this function return an internal_error rather than debug_assert ?

It looks good to me

@@ -1599,11 +1603,10 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
let ordering_reqs: Vec<PhysicalSortExpr> =
physical_sort_exprs.clone().unwrap_or(vec![]);

let schema: Schema = logical_input_schema.clone().into();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workaround cleanup

WindowFunctionDefinition::AggregateUDF(func) => {
// TODO: UDF should be able to customize nullability
if func.name() == "count" {
// TODO: there is issue unsolved for count with window, should return false
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not so familiar with window function yet, leave it as TODO

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can file a ticket to track this -- ideally it would eventually be part of the window function definition itself rather than relying on names

use datafusion_physical_expr::window::WindowExpr;
use std::sync::Arc;

pub(crate) fn create_schema(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move the common function to utils. The logic is the same

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Aug 14, 2024
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
@github-actions github-actions bot removed the sqllogictest SQL Logic Tests (.slt) label Aug 14, 2024
@jayzhan211 jayzhan211 marked this pull request as ready for review August 15, 2024 00:55
@jayzhan211 jayzhan211 requested a review from alamb August 16, 2024 12:08
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a good change to me, but I don't fully understand how it is all connected . Thank you for taking this on @jayzhan211

I am quite concerned about the use of unsafe but otherwise I think all this PR needs is some TODOs with ticket references and it would be good to go from my perspective.

let physical_input_schema_from_logical: Arc<Schema> =
logical_input_schema.as_ref().clone().into();

debug_assert_eq!(physical_input_schema_from_logical, physical_input_schema, "Physical input schema should be the same as the one converted from logical input schema. Please file an issue or send the PR");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Did you consider making this function return an internal_error rather than debug_assert ?

If we are concerned about breaking existing tests, we could add a config setting like datafusion.optimizer.skip_failed_rules to let users bypass the check

WindowFunctionDefinition::AggregateUDF(func) => {
// TODO: UDF should be able to customize nullability
if func.name() == "count" {
// TODO: there is issue unsolved for count with window, should return false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can file a ticket to track this -- ideally it would eventually be part of the window function definition itself rather than relying on names

@@ -328,10 +328,45 @@ impl ExprSchemable for Expr {
Ok(true)
}
}
Expr::WindowFunction(WindowFunction { fun, .. }) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change required for this PR or is it a "drive by" improvement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Required

}
}
Expr::ScalarFunction(ScalarFunction { func, args }) => {
// If all the element in coalesce is non-null, the result is non-null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add an API to ScalarUDFImpl to signal its null/non-nullness (as a follow on PR) instead of hard coding this function name

     func.is_nullable(args)

@@ -196,6 +196,10 @@ impl AggregateUDF {
self.inner.state_fields(args)
}

pub fn fields(&self, args: StateFieldsArgs) -> Result<Field> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we document this function and what it is for (also in AggregateUdfImpl)?

Also, the name is strange to me -- it is fields but it returns a single Field and the corresponding function on AggregateUDFImpl is called field (no s) 🤔

@@ -171,6 +171,9 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
fn get_minmax_desc(&self) -> Option<(Field, bool)> {
None
}

/// Get function's name, for example `count(x)` returns `count`
fn func_name(&self) -> &str;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason this isn't name() ? func_name is fine, it just seems inconsistent with the rest of the code

Copy link
Contributor Author

@jayzhan211 jayzhan211 Aug 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to identify function (i.e. count), there is name() already, but it includes arguments (i.e. count(x)), which is not I want.
Alternative way is introduce nullable() for AggregateUDF, so we don't need name checking. Maybe I should done it before this PR.

*union_nullable = *union_nullable || plan_field.is_nullable();

// Safety: Length is checked
unsafe {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this unsafe block is unecessary -- this isn't a performance critical piece of code. I think izip or just manuallly zipping three times would be better

@@ -80,6 +80,14 @@ impl WindowExpr for PlainAggregateWindowExpr {
}

fn field(&self) -> Result<Field> {
// TODO: Fix window function to always return non-null for count
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comment -- can we please file a ticket to track it (and add the ticket reference to the comments)?

@@ -97,6 +97,10 @@ impl BuiltInWindowExpr {
}

impl WindowExpr for BuiltInWindowExpr {
fn func_name(&self) -> Result<&str> {
not_impl_err!("function name not determined")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why wouldn't we implement func_name for a built in window function 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is because I don't need it -- for name checking in nullable

@jayzhan211
Copy link
Contributor Author

I think func_name is indeed another workaround, would like to getting rid of it before this PR merged

@jayzhan211 jayzhan211 marked this pull request as draft August 17, 2024 01:00
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
@github-actions github-actions bot added sqllogictest SQL Logic Tests (.slt) proto Related to proto crate labels Aug 17, 2024
@@ -435,6 +459,10 @@ impl AggregateExpr for AggregateFunctionExpr {
.is_descending()
.and_then(|flag| self.field().ok().map(|f| (f, flag)))
}

fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> {
Copy link
Contributor Author

@jayzhan211 jayzhan211 Aug 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to add 4 places for a new function, might be room to improve 🤔

Maybe we don't need AggregateExpr since there is only one implement at all. I think trait is useful if there are at least 2 implementation shares similar function. Similar idea from #11810

@@ -87,6 +89,10 @@ impl ScalarUDFImpl for ArrowCastFunc {
internal_err!("arrow_cast should return type from exprs")
}

fn is_nullable(&self, args: &[Expr], schema: &dyn ExprSchema) -> bool {
args.iter().any(|e| e.nullable(schema).ok().unwrap_or(true))
Copy link
Contributor Author

@jayzhan211 jayzhan211 Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is added due to the test from #12050

Copy link
Contributor Author

@jayzhan211 jayzhan211 Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems there are more functions required to customize nullable based on input's nullability. It indicates we have less non-null test cases. But we can add them gradually

WindowFunctionDefinition::BuiltInWindowFunction(func) => {
if func.name() == "RANK"
|| func.name() == "NTILE"
|| func.name() == "CUME_DIST"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this list is complete. What about DenseRank and PercentRank?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporary code, there would be no name checking after #8709 is done. We can see that Row_Number is gone

/// Returns default value of the function given the input is Null
/// Most of the aggregate function return Null if input is Null,
/// while `count` returns 0 if input is Null
fn default_value(&self, data_type: &DataType) -> Result<ScalarValue>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻

/// while `count` returns 0 if input is Null
fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> {
ScalarValue::try_from(data_type)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an AggregateUDFImpl overrides is_nullable() to return false but does not set the default_value(), it seems that it would indicate it is not nullable, yet its default_value() would return null. Is there a way to prevent this behavior? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, it should at least failed when creating record batch. Since the schema indicates non-null but got null value.

If you are saying prevent panic, it is a good question, I don't have answer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can improve the docuemention?

use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move them back to the top?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would appreciate a lot if there is rustfmt.toml so we just need cargo fmt and the import style is consistent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. We should automate this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree as well -- if we care about a consistent location / import order / style of use statements, let's get it in some automated tool that can be run as part of CI rather than try to enforce it manually during code review

I often see PRs that rearrange the imports and I suspect it is some IDE setting. Having it inconsistent makes merge conflicts more common but otherwise is not a problem in my mind

use std::collections::{HashMap, VecDeque};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move them back to top?

@@ -35,6 +30,9 @@ use datafusion_common::utils::evaluate_partition_ranges;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::window_state::{WindowAggState, WindowFrameContext};
use datafusion_expr::WindowFrame;
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move them back to the top?

/// while `count` returns 0 if input is Null
fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> {
ScalarValue::try_from(data_type)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can improve the docuemention?

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
@jayzhan211 jayzhan211 merged commit 6786f15 into apache:main Aug 21, 2024
24 checks passed
@jayzhan211 jayzhan211 deleted the schema-fix branch August 21, 2024 00:47
@jayzhan211
Copy link
Contributor Author

Thanks @alamb @berkaysynnada @ozankabak

@phillipleblanc
Copy link
Contributor

@jayzhan211 I'm in the process of upgrading spiceai to use DataFusion 42 and I'm running into the schema mismatch error from this PR:

Internal("Physical input schema should be the same as the one converted from logical input schema.")

I have a custom TableProvider and I get the error when running a SELECT COUNT(1) FROM my_table. This is what the explain plan looked like on DF 41:

let expected_plan = [
        "+---------------+--------------------------------------------------------------------------------+",
        "| plan_type     | plan                                                                           |",
        "+---------------+--------------------------------------------------------------------------------+",
        "| logical_plan  | Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]                              |",
        "|               |   BytesProcessedNode                                                           |",
        "|               |     TableScan: non_federated_abc projection=[]                                 |",
        "| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]                      |",
        "|               |   CoalescePartitionsExec                                                       |",
        "|               |     AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]                |",
        "|               |       BytesProcessedExec                                                       |",
        "|               |         SchemaCastScanExec                                                     |",
        "|               |           RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1 |",
        "|               |             SqlExec sql=SELECT \"id\", \"created_at\" FROM non_federated_abc       |",
        "|               |                                                                                |",
        "+---------------+--------------------------------------------------------------------------------+",
    ];

(BytesProcessedNode and BytesProcessedExec are custom operators that we inject for tracking the number of bytes processed, I don't think its relevant to this bug - but I initially had a similar schema check for it that I ended up removing for the reason below)

My assumption of what is going on here is that logically no columns are required for the logical plan to come up with the count of the number of rows, but the TableProvider has to return all of the columns because it needs the rows to perform the count aggregation. But it ends up throwing away the columns because they get erased in the aggregation. Thus the check that the physical schema and the logical schema are equal is not strictly needed for this plan. Does that sound right?

@alamb
Copy link
Contributor

alamb commented Sep 19, 2024

@itsjunetime is also having some issues related to this ticket during our upgrade of DataFusion, I am not sure if they are releated

@jayzhan211
Copy link
Contributor Author

What is the logical schema and physical schema you have (the error)?

I think they should be consistent even for count (*) statement. They (logical & physical)should either have all the columns or no columns

@phillipleblanc
Copy link
Contributor

I added dbg! statements to both, and I see one has the fields and the one from the logical plan is empty:

[/Users/phillip/code/apache/datafusion/datafusion/core/src/physical_planner.rs:676:21] &physical_input_schema = Schema {
    fields: [
        Field {
            name: "id",
            data_type: Utf8,
            nullable: true,
            dict_id: 0,
            dict_is_ordered: false,
            metadata: {},
        },
        Field {
            name: "created_at",
            data_type: Timestamp(
                Nanosecond,
                Some(
                    "+00:00",
                ),
            ),
            nullable: true,
            dict_id: 0,
            dict_is_ordered: false,
            metadata: {},
        },
    ],
    metadata: {},
}
[/Users/phillip/code/apache/datafusion/datafusion/core/src/physical_planner.rs:677:21] &physical_input_schema_from_logical = Schema {
    fields: [],
    metadata: {},
}
thread 'acceleration::query_push_down::acceleration_with_and_without_federation' panicked at crates/runtime/tests/acceleration/query_push_down.rs:218:10:
collect working: Internal("Physical input schema should be the same as the one converted from logical input schema.")

@jayzhan211
Copy link
Contributor Author

jayzhan211 commented Sep 19, 2024

I think the ideal way is to have something like a wildcard field for both logical and physical but this requires modification of Schema 🤔

logically no columns are required for the logical plan to come up with the count of the number of rows,

Could we keep all the fields for the logical plan to make them consistent?

@phillipleblanc
Copy link
Contributor

Could we keep all the fields for the logical plan to make them consistent?

That seems fine to me.

@itsjunetime
Copy link
Contributor

What is the logical schema and physical schema you have (the error)?

I'm getting issues where both schemas are equivalent except for the metadata on the fields. I think (correct me if I'm wrong) that field metadata doesn't actually need to be equivalent for the invariant that this error is trying to catch to be upheld. I think we could comfortably switch from directly comparing the schemas with PartialEq to using equivalent_names_and_types or something like that. I can file a ticket and reproducer/fix for this.

I'm also seeing issues that don't propagate with this exact error (but rather complain that count(*) is a non-nullable column that contains null values), so once I've figured out more specifics for this issue I can file another ticket for it.

@jayzhan211
Copy link
Contributor Author

I think (correct me if I'm wrong) that field metadata doesn't actually need to be equivalent for the invariant that this error is trying to catch to be upheld

If the metadata is mismatched, it indicates we lost the metadata somewhere when passing through the schema info, so I think it makes sense to check the metadata too. Maybe we should figure out the reason why metadata is mismatched first

wiedld pushed a commit to influxdata/arrow-datafusion that referenced this pull request Oct 4, 2024
…nction, add `AggregateUDFImpl::is_null` (apache#11989)

* schema assertion and fix the mismatch from logical and physical

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* add more msg

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* cleanup

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* rm test1

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* nullable for scalar func

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* nullable

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* rm field

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* rm unsafe block and use internal error

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* rm func_name

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* rm nullable option

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* add test

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* add more msg

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* fix test

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* rm row number

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* Update datafusion/expr/src/udaf.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Update datafusion/expr/src/udaf.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* fix failed test from apache#12050

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* cleanup

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* add doc

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

---------

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate functions logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Physical Expressions sql SQL Planner sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants