-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add batch operations to stddev #1547
Conversation
@alamb per our discussion, this is to add batch operations for stddev and var. |
Thank you @realno -- I will try and review this later today |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me -- thanks @realno
This PR makes it very clear that the Aggregate
API is very confusing.
For aggregates, only update_batch
and merge_batch
are needed. The extra implementations of update
and merge_batch
in terms of ScalarValue is not ever called (I actually deleted the implementation of update
and merge
locally and all the tests still pass)
Thus, my plan is to:
- Make a follow on PR that removes the implementation of Variance in terms of ScalarValue (as well as the supporting math functions)
- Create a proposed PR that removes
update
andmerge
completely in favor of some adapter functions and documentation
Thanks again for this very high quality work 🏅
for i in 0..arr.len() { | ||
let value = arr.value(i); | ||
|
||
if value == 0_f64 && values.is_null(i) { | ||
continue; | ||
} | ||
let new_count = self.count + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a more idiomatic way to iterate over the array and skip nulls (and also faster as it doesn't check the bounds on each access to arr.value(i)
:
for i in 0..arr.len() { | |
let value = arr.value(i); | |
if value == 0_f64 && values.is_null(i) { | |
continue; | |
} | |
let new_count = self.count + 1; | |
// NB: filter map skips `None` (null) values | |
for value in arr.iter().filter_map(|v| v) { | |
let new_count = self.count + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great suggestion!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some investigation, this approach does work as expected. The reason for the null check is because downcast_ref
replace the None
values into 0_f64
so we need to check in the original array when a 0 is observed. The proposed code checks the array after the type cast so it can't catch the nulls. I tried to find a good way to do similar things on the original array but yet to have any luck. I will dig a bit deeper later, please let me know if you a way to achieve this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some investigation, this approach does work as expected.
FWIW I tried making these changes locally and all the tests passed for me
The reason for the null check is because downcast_ref replace the None values into 0_f64 so we need to check in the original array when a 0 is observed.
I am not sure this is accurate. The way arrow works is that the values and "validity" are tracked in separate structures.
Thus for elements that are NULL
there is some arbitrary value in the array (which will likely be 0.0f
, though that is not guaranteed by the arrow spec).
The construct of arr.iter()
returns an iterator of Option<f64>
that is None
if the element is NULL, and Some(f64_value)
if the element is non-NULL.
the use of filter_map
then filters out the None
elements, somewhat non obviously
This
for value in arr.iter().filter_map(|v| v) {
Is effectively the same as
for value in arr.iter() {
let value = match value {
Some(v) => v,
None => continue,
};
So I actually think there is a bug in this code as written with nulls -- the check should be
if values.is_null(i) {
Rather than
if value == 0_f64 && values.is_null(i) {
(as null values are not guaranteed to be 0.0f)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for clarifying, I will do some more testing locally and follow up with a PR (or more questions :D).
for i in 0..counts.len() { | ||
let c = counts.value(i); | ||
if c == 0_u64 { | ||
continue; | ||
} | ||
let new_count = self.count + c; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for i in 0..counts.len() { | |
let c = counts.value(i); | |
if c == 0_u64 { | |
continue; | |
} | |
let new_count = self.count + c; | |
let non_null_counts = counts | |
.iter() | |
.enumerate() | |
.filter_map(|(i, c)| c.map(|c| (i, c))); | |
for (i,c) in non_null_counts { | |
let new_count = self.count + c; | |
By the same logic as above this also skips checking bounds on each row. Though for sure I would say this is less readable :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great suggestion! For this part of the code the length of the array is pretty small (number of partitions to merge), so maybe we can opt for readability here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems reasonable to me
@@ -209,8 +214,8 @@ impl AggregateExpr for VariancePop { | |||
|
|||
#[derive(Debug)] | |||
pub struct VarianceAccumulator { | |||
m2: ScalarValue, | |||
mean: ScalarValue, | |||
m2: f64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
See #1550 for a cleanup of some of this code. |
Which issue does this PR close?
Closes #1546 .
Rationale for this change
For more efficient calculation, we want to implement the batch methods.
What changes are included in this PR?
Are there any user-facing changes?
No user facing changes.
No break changes.