Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batch operations to stddev #1547

Merged
merged 29 commits into from
Jan 11, 2022

Conversation

realno
Copy link
Contributor

@realno realno commented Jan 11, 2022

Which issue does this PR close?

Closes #1546 .

Rationale for this change

For more efficient calculation, we want to implement the batch methods.

What changes are included in this PR?

Are there any user-facing changes?

No user facing changes.

No break changes.

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label Jan 11, 2022
@realno realno mentioned this pull request Jan 11, 2022
@realno
Copy link
Contributor Author

realno commented Jan 11, 2022

@alamb per our discussion, this is to add batch operations for stddev and var.

@alamb
Copy link
Contributor

alamb commented Jan 11, 2022

Thank you @realno -- I will try and review this later today

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me -- thanks @realno

This PR makes it very clear that the Aggregate API is very confusing.

For aggregates, only update_batch and merge_batch are needed. The extra implementations of update and merge_batch in terms of ScalarValue is not ever called (I actually deleted the implementation of update and merge locally and all the tests still pass)

Thus, my plan is to:

  1. Make a follow on PR that removes the implementation of Variance in terms of ScalarValue (as well as the supporting math functions)
  2. Create a proposed PR that removes update and merge completely in favor of some adapter functions and documentation

Thanks again for this very high quality work 🏅

Comment on lines +260 to +266
for i in 0..arr.len() {
let value = arr.value(i);

if value == 0_f64 && values.is_null(i) {
continue;
}
let new_count = self.count + 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a more idiomatic way to iterate over the array and skip nulls (and also faster as it doesn't check the bounds on each access to arr.value(i):

Suggested change
for i in 0..arr.len() {
let value = arr.value(i);
if value == 0_f64 && values.is_null(i) {
continue;
}
let new_count = self.count + 1;
// NB: filter map skips `None` (null) values
for value in arr.iter().filter_map(|v| v) {
let new_count = self.count + 1;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion!

Copy link
Contributor Author

@realno realno Jan 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some investigation, this approach does work as expected. The reason for the null check is because downcast_ref replace the None values into 0_f64 so we need to check in the original array when a 0 is observed. The proposed code checks the array after the type cast so it can't catch the nulls. I tried to find a good way to do similar things on the original array but yet to have any luck. I will dig a bit deeper later, please let me know if you a way to achieve this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some investigation, this approach does work as expected.

FWIW I tried making these changes locally and all the tests passed for me

The reason for the null check is because downcast_ref replace the None values into 0_f64 so we need to check in the original array when a 0 is observed.

I am not sure this is accurate. The way arrow works is that the values and "validity" are tracked in separate structures.

Thus for elements that are NULL there is some arbitrary value in the array (which will likely be 0.0f, though that is not guaranteed by the arrow spec).

The construct of arr.iter() returns an iterator of Option<f64> that is None if the element is NULL, and Some(f64_value) if the element is non-NULL.

the use of filter_map then filters out the None elements, somewhat non obviously

This

        for value in arr.iter().filter_map(|v| v) {

Is effectively the same as

        for value in arr.iter() {
          let value = match value {
            Some(v) => v,
            None => continue,
        };

So I actually think there is a bug in this code as written with nulls -- the check should be

            if values.is_null(i) {

Rather than

            if value == 0_f64 && values.is_null(i) {

(as null values are not guaranteed to be 0.0f)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying, I will do some more testing locally and follow up with a PR (or more questions :D).

Comment on lines +285 to +290
for i in 0..counts.len() {
let c = counts.value(i);
if c == 0_u64 {
continue;
}
let new_count = self.count + c;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for i in 0..counts.len() {
let c = counts.value(i);
if c == 0_u64 {
continue;
}
let new_count = self.count + c;
let non_null_counts = counts
.iter()
.enumerate()
.filter_map(|(i, c)| c.map(|c| (i, c)));
for (i,c) in non_null_counts {
let new_count = self.count + c;

By the same logic as above this also skips checking bounds on each row. Though for sure I would say this is less readable :(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion! For this part of the code the length of the array is pretty small (number of partitions to merge), so maybe we can opt for readability here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems reasonable to me

@@ -209,8 +214,8 @@ impl AggregateExpr for VariancePop {

#[derive(Debug)]
pub struct VarianceAccumulator {
m2: ScalarValue,
mean: ScalarValue,
m2: f64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb
Copy link
Contributor

alamb commented Jan 11, 2022

See #1550 for a cleanup of some of this code.

@realno realno deleted the add-batch-operations-to-stddev branch February 9, 2022 20:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement batch_update and batch_merge for var and stddev operator
3 participants