Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement groups accumulator for stddev and variance #12095

Merged
merged 4 commits into from
Aug 26, 2024

Conversation

eejbyfeldt
Copy link
Contributor

@eejbyfeldt eejbyfeldt commented Aug 21, 2024

Which issue does this PR close?

Closes #12094.

Rationale for this change

Hopefully improve performance of queries using stddev and variance aggregates.

I did not find any of the current benchmarks containing stddev and/or variance. So I modified clickbench query 31 and 32 to use STDDEV instead of AVG. And these were the results:

Before

Running benchmarks with the following options: RunOpt { query: None, common: CommonOpt { iterations: 5, partitions: None, batch_size: 8192, debug: false, string_view: false }, path: "/home/eejbyfeldt/dev/apache/datafusion/benchmarks/data/hits.parquet", queries_path: "/home/eejbyfeldt/dev/apache/datafusion/benchmarks/queries/clickbench/queries.sql", output_path: Some("/home/eejbyfeldt/dev/apache/datafusion/benchmarks/results/implement-groups-accumulator-for-stddev/clickbench_1.json") }
Q0: SELECT "SearchEngineID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), STDDEV("ResolutionWidth") FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchEngineID", "ClientIP" ORDER BY c DESC LIMIT 10;
Query 0 iteration 0 took 3945.8 ms and returned 10 rows
Query 0 iteration 1 took 3993.8 ms and returned 10 rows
Query 0 iteration 2 took 4016.9 ms and returned 10 rows
Query 0 iteration 3 took 3991.3 ms and returned 10 rows
Query 0 iteration 4 took 4024.2 ms and returned 10 rows
Q1: SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), STDDEV("ResolutionWidth") FROM hits WHERE "SearchPhrase" <> '' GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10;
Query 1 iteration 0 took 6551.8 ms and returned 10 rows
Query 1 iteration 1 took 6642.9 ms and returned 10 rows
Query 1 iteration 2 took 6618.6 ms and returned 10 rows
Query 1 iteration 3 took 6680.7 ms and returned 10 rows
Query 1 iteration 4 took 6568.3 ms and returned 10 rows
Done

After

Running benchmarks with the following options: RunOpt { query: None, common: CommonOpt { iterations: 5, partitions: None, batch_size: 8192, debug: false, string_view: false }, path: "/home/eejbyfeldt/dev/apache/datafusion/benchmarks/data/hits.parquet", queries_path: "/home/eejbyfeldt/dev/apache/datafusion/benchmarks/queries/clickbench/queries.sql", output_path: Some("/home/eejbyfeldt/dev/apache/datafusion/benchmarks/results/implement-groups-accumulator-for-stddev/clickbench_1.json") }
Q0: SELECT "SearchEngineID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), STDDEV("ResolutionWidth") FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchEngineID", "ClientIP" ORDER BY c DESC LIMIT 10;
Query 0 iteration 0 took 2144.0 ms and returned 10 rows
Query 0 iteration 1 took 2155.1 ms and returned 10 rows
Query 0 iteration 2 took 2095.3 ms and returned 10 rows
Query 0 iteration 3 took 2100.2 ms and returned 10 rows
Query 0 iteration 4 took 2238.7 ms and returned 10 rows
Q1: SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), STDDEV("ResolutionWidth") FROM hits WHERE "SearchPhrase" <> '' GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10;
Query 1 iteration 0 took 3075.6 ms and returned 10 rows
Query 1 iteration 1 took 3062.4 ms and returned 10 rows
Query 1 iteration 2 took 3081.9 ms and returned 10 rows
Query 1 iteration 3 took 3105.1 ms and returned 10 rows
Query 1 iteration 4 took 3101.6 ms and returned 10 rows
Done

What changes are included in this PR?

An implementation of GroupsAccumulator for the stddev and variance aggregates.

It extracts the core logic from NullState::acummulate to a free function accumulate to make it reusable when implementing GroupsAccumulators that do not need separate null tracking as can be determine from some other value (in this case the count).

Are these changes tested?

Added more test cases in aggregates.slt

Are there any user-facing changes?

No.

@andygrove
Copy link
Member

Thanks @eejbyfeldt. Do you have any before/after benchmark results?

@eejbyfeldt
Copy link
Contributor Author

eejbyfeldt commented Aug 23, 2024

Thanks @eejbyfeldt. Do you have any before/after benchmark results?

Yes some benchmarks sounds like a great idea. I updated the PR description with something basic queries.

FAs far as I could tell stddev/varaiance was not used in any current benchmarks, so I modified two clickbench queries to use stddev instead of avg. The queries were choosen based on them having big improvements when the GroupsAccumulator api was first implemented (according to https://www.influxdata.com/blog/aggregating-millions-groups-fast-apache-arrow-datafusion/) so they are probably showing best case improvement.

@andygrove Let me know if you think we should be doing something more advanced.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @eejbyfeldt and @andygrove -- I reviewed this PR carefully and I think it is really nicely written and tested. I had some small suggestions on adding comments, but otherwise it is really nice: it uses the existing code beautifully.

🚀

I also made a PR with a benchmark for STDDEV and VAR in #12146

Using that benchmark, I confirmed @eejbyfeldt 's results that this PR makes stddev and variance significantly faster.

On main
Elapsed 0.451 seconds.
Elapsed 0.469 seconds.

With this branch
Elapsed 0.331 seconds.
Elapsed 0.336 seconds.

datafusion/functions-aggregate/src/variance.rs Outdated Show resolved Hide resolved
emit_to: datafusion_expr::EmitTo,
) -> (Vec<f64>, NullBuffer) {
let mut counts = emit_to.take_needed(&mut self.counts);
let _ = emit_to.take_needed(&mut self.means);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it took me a while to understand why self.means was ignored in the final calculation (and thus is there any need to carry it through).

However, with some study I see that the means are needed to calculate m2 during accumulation but are not used in the final output

Maybe we could point that out in a comment (I can do it as a follow on PR too)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment in 0f1b64c

*count -= 1;
});
}
let nulls = NullBuffer::from_iter(counts.iter().map(|&count| count != 0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 it took me a while to figure out how nulls were handled -- LGTM

@@ -500,6 +500,85 @@ select stddev(sq.column1) from (values (1.1), (2.0), (3.0)) as sq
----
0.950438495292

# csv_query_stddev_7
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 for test coverage

emit_to: datafusion_expr::EmitTo,
) -> (Vec<f64>, NullBuffer) {
let mut counts = emit_to.take_needed(&mut self.counts);
let _ = emit_to.take_needed(&mut self.means);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps there could be a take_needed that doesn't generate / return the Vec

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I think it won't gain much, so probably better to leave as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there is probably no performance gains as it would need to do the same work as take_needed.

I guess it might be worth adding if this becomes a common pattern in more implementations.

@eejbyfeldt eejbyfeldt force-pushed the implement-groups-accumulator-for-stddev branch from 525106d to 0f1b64c Compare August 25, 2024 07:03
@alamb
Copy link
Contributor

alamb commented Aug 25, 2024

I'll plan to merge this tomorrow unless there are additional comments

@alamb
Copy link
Contributor

alamb commented Aug 26, 2024

Thanks again @eejbyfeldt and @Dandandan

@alamb alamb merged commit 1b875f4 into apache:main Aug 26, 2024
24 checks passed
eejbyfeldt added a commit to eejbyfeldt/datafusion that referenced this pull request Sep 25, 2024
The bug was in the orginal implementation in apache#12095. This fixes the
issue and modify a test case such that it would have caught it.
alamb pushed a commit that referenced this pull request Sep 25, 2024
The bug was in the orginal implementation in #12095. This fixes the
issue and modify a test case such that it would have caught it.
bgjackma pushed a commit to bgjackma/datafusion that referenced this pull request Sep 25, 2024
The bug was in the orginal implementation in apache#12095. This fixes the
issue and modify a test case such that it would have caught it.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
functions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement GroupsAccumulator for stddev and var aggregaters
4 participants