Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix push_down_filter for pushing filters on grouping columns rather than aggregate columns #4447

Merged
merged 7 commits into from
Dec 3, 2022

Conversation

jackwener
Copy link
Member

Which issue does this PR close?

Closes #4401.

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the optimizer Optimizer rules label Dec 1, 2022
Comment on lines -646 to -649
|| !columns
.intersection(&used_columns)
.collect::<HashSet<_>>()
.is_empty()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original performance was bad

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in the last PR, I think we do not need to check the aggregate Exprs, but just check the group by Exprs. In some cases, the same column can exist in both aggregate Exprs and group by Exprs, for example select count(distinct col_a), col_a from table group by col_a; . If there is a Filter applied to col_a, the Filter can still be pushed down even it is referred by the agg Exprs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic should check all the columns used by the Filter predicate is the subset of the group by Exprs output Columns.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. For push_down_filter through Agg, we can push Expr in groupby_expr.
Has add it.

@jackwener jackwener mentioned this pull request Dec 1, 2022
@@ -910,11 +922,9 @@ mod tests {
// rewrite to CNF
// (c = 1 OR c = 1) [can pushDown] AND (c = 1 OR b > 3) AND (b > 2 OR C = 1) AND (b > 2 OR b > 3)

let expected = "\
Filter: (test.c = Int64(1) OR b > Int64(3)) AND (b > Int64(2) OR test.c = Int64(1)) AND (b > Int64(2) OR b > Int64(3))\
let expected = "Filter: (test.c = Int64(1) OR test.c = Int64(1)) AND (test.c = Int64(1) OR b > Int64(3)) AND (b > Int64(2) OR test.c = Int64(1)) AND (b > Int64(2) OR b > Int64(3))\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 913 to 917
let expected = "\
Filter: (test.c = Int64(1) OR b > Int64(3)) AND (b > Int64(2) OR test.c = Int64(1)) AND (b > Int64(2) OR b > Int64(3))\
let expected = "Filter: (test.c = Int64(1) OR test.c = Int64(1)) AND (test.c = Int64(1) OR b > Int64(3)) AND (b > Int64(2) OR test.c = Int64(1)) AND (b > Int64(2) OR b > Int64(3))\
\n Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b) AS b]]\
\n Filter: test.c = Int64(1) OR test.c = Int64(1)\
\n TableScan: test";
Copy link
Member Author

@jackwener jackwener Dec 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original plan is wrong.😂
I think we need to delete this wrong UT.

Filter include column that not in output of Aggregate.

Copy link
Member

@Ted-Jiang Ted-Jiang Dec 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it was my fault.
col_c should not exist in filter. Need delete it 😂
@jackwener

@mingmwang
Copy link
Contributor

LGTM.

@jackwener
Copy link
Member Author

jackwener commented Dec 3, 2022

@alamb @Dandandan PTAL

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me -- thank you @jackwener

}
}

let child = match conjunction(push_predicates) {
// As for plan Filter: Column(a+b) > 0 -- Agg: groupby:[Column(a)+Column(b)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice -- this is getting quite sophisticated.

// So we need create a replace_map, add {`a+b` --> Expr(Column(a)+Column(b))}
let mut replace_map = HashMap::new();
for expr in &agg.group_expr {
replace_map.insert(expr.display_name()?, expr.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double checked that display_name is the right one: https://docs.rs/datafusion/14.0.0/datafusion/prelude/enum.Expr.html#method.display_name 👍

\n TableScan: test";
let expected =
"Aggregate: groupBy=[[test.b + test.a]], aggr=[[SUM(test.a), test.b]]\
\n Filter: test.b + test.a > Int64(10)\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 very nice

\n Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b) AS b]]\
\n Filter: test.c = Int64(1) OR test.c = Int64(1)\
\n TableScan: test";
Filter: b > Int64(10)\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this new plan is correct

let expected = "Projection: c, COUNT(UInt8(1))\
\n Projection: test.col_int32 + test.col_uint32 AS c, COUNT(UInt8(1))\
\n Aggregate: groupBy=[[test.col_int32 + CAST(test.col_uint32 AS Int32)]], aggr=[[COUNT(UInt8(1))]]\
\n Filter: test.col_int32 + CAST(test.col_uint32 AS Int32) > Int32(3)\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb alamb changed the title fix push_down_filter push Expr instead of column. fix push_down_filter for pushing filters on grouping columns rather than aggregate columns Dec 3, 2022
@alamb alamb merged commit 0509692 into apache:master Dec 3, 2022
@ursabot
Copy link

ursabot commented Dec 3, 2022

Benchmark runs are scheduled for baseline = 8db99d2 and contender = 0509692. 0509692 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@jackwener jackwener deleted the fix_bug branch December 6, 2022 15:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
optimizer Optimizer rules
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Optimization rule filter_push_down causes FieldNotFound error
6 participants