diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index 455fc5fec450..a0475fe8e446 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -91,36 +91,9 @@ impl NullState { /// * `opt_filter`: if present, only rows for which is Some(true) are included /// * `value_fn`: function invoked for (group_index, value) where value is non null /// - /// # Example + /// See [`accumulate`], for more details on how value_fn is called /// - /// ```text - /// ┌─────────┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐ - /// │ ┌─────┐ │ │ ┌─────┐ │ ┌─────┐ - /// │ │ 2 │ │ │ │ 200 │ │ │ │ t │ │ - /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ - /// │ │ 2 │ │ │ │ 100 │ │ │ │ f │ │ - /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ - /// │ │ 0 │ │ │ │ 200 │ │ │ │ t │ │ - /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ - /// │ │ 1 │ │ │ │ 200 │ │ │ │NULL │ │ - /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ - /// │ │ 0 │ │ │ │ 300 │ │ │ │ t │ │ - /// │ └─────┘ │ │ └─────┘ │ └─────┘ - /// └─────────┘ └─────────┘ └ ─ ─ ─ ─ ┘ - /// - /// group_indices values opt_filter - /// ``` - /// - /// In the example above, `value_fn` is invoked for each (group_index, - /// value) pair where `opt_filter[i]` is true and values is non null - /// - /// ```text - /// value_fn(2, 200) - /// value_fn(0, 200) - /// value_fn(0, 300) - /// ``` - /// - /// It also sets + /// When value_fn is called it also sets /// /// 1. `self.seen_values[group_index]` to true for all rows that had a non null vale pub fn accumulate( @@ -134,105 +107,14 @@ impl NullState { T: ArrowPrimitiveType + Send, F: FnMut(usize, T::Native) + Send, { - let data: &[T::Native] = values.values(); - assert_eq!(data.len(), group_indices.len()); - // ensure the seen_values is big enough (start everything at // "not seen" valid) let seen_values = initialize_builder(&mut self.seen_values, total_num_groups, false); - - match (values.null_count() > 0, opt_filter) { - // no nulls, no filter, - (false, None) => { - let iter = group_indices.iter().zip(data.iter()); - for (&group_index, &new_value) in iter { - seen_values.set_bit(group_index, true); - value_fn(group_index, new_value); - } - } - // nulls, no filter - (true, None) => { - let nulls = values.nulls().unwrap(); - // This is based on (ahem, COPY/PASTE) arrow::compute::aggregate::sum - // iterate over in chunks of 64 bits for more efficient null checking - let group_indices_chunks = group_indices.chunks_exact(64); - let data_chunks = data.chunks_exact(64); - let bit_chunks = nulls.inner().bit_chunks(); - - let group_indices_remainder = group_indices_chunks.remainder(); - let data_remainder = data_chunks.remainder(); - - group_indices_chunks - .zip(data_chunks) - .zip(bit_chunks.iter()) - .for_each(|((group_index_chunk, data_chunk), mask)| { - // index_mask has value 1 << i in the loop - let mut index_mask = 1; - group_index_chunk.iter().zip(data_chunk.iter()).for_each( - |(&group_index, &new_value)| { - // valid bit was set, real value - let is_valid = (mask & index_mask) != 0; - if is_valid { - seen_values.set_bit(group_index, true); - value_fn(group_index, new_value); - } - index_mask <<= 1; - }, - ) - }); - - // handle any remaining bits (after the initial 64) - let remainder_bits = bit_chunks.remainder_bits(); - group_indices_remainder - .iter() - .zip(data_remainder.iter()) - .enumerate() - .for_each(|(i, (&group_index, &new_value))| { - let is_valid = remainder_bits & (1 << i) != 0; - if is_valid { - seen_values.set_bit(group_index, true); - value_fn(group_index, new_value); - } - }); - } - // no nulls, but a filter - (false, Some(filter)) => { - assert_eq!(filter.len(), group_indices.len()); - // The performance with a filter could be improved by - // iterating over the filter in chunks, rather than a single - // iterator. TODO file a ticket - group_indices - .iter() - .zip(data.iter()) - .zip(filter.iter()) - .for_each(|((&group_index, &new_value), filter_value)| { - if let Some(true) = filter_value { - seen_values.set_bit(group_index, true); - value_fn(group_index, new_value); - } - }) - } - // both null values and filters - (true, Some(filter)) => { - assert_eq!(filter.len(), group_indices.len()); - // The performance with a filter could be improved by - // iterating over the filter in chunks, rather than using - // iterators. TODO file a ticket - filter - .iter() - .zip(group_indices.iter()) - .zip(values.iter()) - .for_each(|((filter_value, &group_index), new_value)| { - if let Some(true) = filter_value { - if let Some(new_value) = new_value { - seen_values.set_bit(group_index, true); - value_fn(group_index, new_value) - } - } - }) - } - } + accumulate(group_indices, values, opt_filter, |group_index, value| { + seen_values.set_bit(group_index, true); + value_fn(group_index, value); + }); } /// Invokes `value_fn(group_index, value)` for each non null, non @@ -351,6 +233,144 @@ impl NullState { } } +/// Invokes `value_fn(group_index, value)` for each non null, non +/// filtered value of `value`, +/// +/// # Arguments: +/// +/// * `group_indices`: To which groups do the rows in `values` belong, (aka group_index) +/// * `values`: the input arguments to the accumulator +/// * `opt_filter`: if present, only rows for which is Some(true) are included +/// * `value_fn`: function invoked for (group_index, value) where value is non null +/// +/// # Example +/// +/// ```text +/// ┌─────────┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐ +/// │ ┌─────┐ │ │ ┌─────┐ │ ┌─────┐ +/// │ │ 2 │ │ │ │ 200 │ │ │ │ t │ │ +/// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ +/// │ │ 2 │ │ │ │ 100 │ │ │ │ f │ │ +/// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ +/// │ │ 0 │ │ │ │ 200 │ │ │ │ t │ │ +/// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ +/// │ │ 1 │ │ │ │ 200 │ │ │ │NULL │ │ +/// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ +/// │ │ 0 │ │ │ │ 300 │ │ │ │ t │ │ +/// │ └─────┘ │ │ └─────┘ │ └─────┘ +/// └─────────┘ └─────────┘ └ ─ ─ ─ ─ ┘ +/// +/// group_indices values opt_filter +/// ``` +/// +/// In the example above, `value_fn` is invoked for each (group_index, +/// value) pair where `opt_filter[i]` is true and values is non null +/// +/// ```text +/// value_fn(2, 200) +/// value_fn(0, 200) +/// value_fn(0, 300) +/// ``` +pub fn accumulate( + group_indices: &[usize], + values: &PrimitiveArray, + opt_filter: Option<&BooleanArray>, + mut value_fn: F, +) where + T: ArrowPrimitiveType + Send, + F: FnMut(usize, T::Native) + Send, +{ + let data: &[T::Native] = values.values(); + assert_eq!(data.len(), group_indices.len()); + + match (values.null_count() > 0, opt_filter) { + // no nulls, no filter, + (false, None) => { + let iter = group_indices.iter().zip(data.iter()); + for (&group_index, &new_value) in iter { + value_fn(group_index, new_value); + } + } + // nulls, no filter + (true, None) => { + let nulls = values.nulls().unwrap(); + // This is based on (ahem, COPY/PASTE) arrow::compute::aggregate::sum + // iterate over in chunks of 64 bits for more efficient null checking + let group_indices_chunks = group_indices.chunks_exact(64); + let data_chunks = data.chunks_exact(64); + let bit_chunks = nulls.inner().bit_chunks(); + + let group_indices_remainder = group_indices_chunks.remainder(); + let data_remainder = data_chunks.remainder(); + + group_indices_chunks + .zip(data_chunks) + .zip(bit_chunks.iter()) + .for_each(|((group_index_chunk, data_chunk), mask)| { + // index_mask has value 1 << i in the loop + let mut index_mask = 1; + group_index_chunk.iter().zip(data_chunk.iter()).for_each( + |(&group_index, &new_value)| { + // valid bit was set, real value + let is_valid = (mask & index_mask) != 0; + if is_valid { + value_fn(group_index, new_value); + } + index_mask <<= 1; + }, + ) + }); + + // handle any remaining bits (after the initial 64) + let remainder_bits = bit_chunks.remainder_bits(); + group_indices_remainder + .iter() + .zip(data_remainder.iter()) + .enumerate() + .for_each(|(i, (&group_index, &new_value))| { + let is_valid = remainder_bits & (1 << i) != 0; + if is_valid { + value_fn(group_index, new_value); + } + }); + } + // no nulls, but a filter + (false, Some(filter)) => { + assert_eq!(filter.len(), group_indices.len()); + // The performance with a filter could be improved by + // iterating over the filter in chunks, rather than a single + // iterator. TODO file a ticket + group_indices + .iter() + .zip(data.iter()) + .zip(filter.iter()) + .for_each(|((&group_index, &new_value), filter_value)| { + if let Some(true) = filter_value { + value_fn(group_index, new_value); + } + }) + } + // both null values and filters + (true, Some(filter)) => { + assert_eq!(filter.len(), group_indices.len()); + // The performance with a filter could be improved by + // iterating over the filter in chunks, rather than using + // iterators. TODO file a ticket + filter + .iter() + .zip(group_indices.iter()) + .zip(values.iter()) + .for_each(|((filter_value, &group_index), new_value)| { + if let Some(true) = filter_value { + if let Some(new_value) = new_value { + value_fn(group_index, new_value) + } + } + }) + } + } +} + /// This function is called to update the accumulator state per row /// when the value is not needed (e.g. COUNT) /// diff --git a/datafusion/functions-aggregate/src/stddev.rs b/datafusion/functions-aggregate/src/stddev.rs index 180f4ad3cf37..3534fb5b4d26 100644 --- a/datafusion/functions-aggregate/src/stddev.rs +++ b/datafusion/functions-aggregate/src/stddev.rs @@ -19,17 +19,21 @@ use std::any::Any; use std::fmt::{Debug, Formatter}; +use std::sync::Arc; +use arrow::array::Float64Array; use arrow::{array::ArrayRef, datatypes::DataType, datatypes::Field}; use datafusion_common::{internal_err, not_impl_err, Result}; use datafusion_common::{plan_err, ScalarValue}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::utils::format_state_name; -use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility}; +use datafusion_expr::{ + Accumulator, AggregateUDFImpl, GroupsAccumulator, Signature, Volatility, +}; use datafusion_functions_aggregate_common::stats::StatsType; -use crate::variance::VarianceAccumulator; +use crate::variance::{VarianceAccumulator, VarianceGroupsAccumulator}; make_udaf_expr_and_func!( Stddev, @@ -118,6 +122,17 @@ impl AggregateUDFImpl for Stddev { fn aliases(&self) -> &[String] { &self.alias } + + fn groups_accumulator_supported(&self, acc_args: AccumulatorArgs) -> bool { + !acc_args.is_distinct + } + + fn create_groups_accumulator( + &self, + _args: AccumulatorArgs, + ) -> Result> { + Ok(Box::new(StddevGroupsAccumulator::new(StatsType::Sample))) + } } make_udaf_expr_and_func!( @@ -201,6 +216,19 @@ impl AggregateUDFImpl for StddevPop { Ok(DataType::Float64) } + + fn groups_accumulator_supported(&self, acc_args: AccumulatorArgs) -> bool { + !acc_args.is_distinct + } + + fn create_groups_accumulator( + &self, + _args: AccumulatorArgs, + ) -> Result> { + Ok(Box::new(StddevGroupsAccumulator::new( + StatsType::Population, + ))) + } } /// An accumulator to compute the average @@ -267,6 +295,57 @@ impl Accumulator for StddevAccumulator { } } +#[derive(Debug)] +pub struct StddevGroupsAccumulator { + variance: VarianceGroupsAccumulator, +} + +impl StddevGroupsAccumulator { + pub fn new(s_type: StatsType) -> Self { + Self { + variance: VarianceGroupsAccumulator::new(s_type), + } + } +} + +impl GroupsAccumulator for StddevGroupsAccumulator { + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&arrow::array::BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + self.variance + .update_batch(values, group_indices, opt_filter, total_num_groups) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&arrow::array::BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + self.variance + .merge_batch(values, group_indices, opt_filter, total_num_groups) + } + + fn evaluate(&mut self, emit_to: datafusion_expr::EmitTo) -> Result { + let (mut variances, nulls) = self.variance.variance(emit_to); + variances.iter_mut().for_each(|v| *v = v.sqrt()); + Ok(Arc::new(Float64Array::new(variances.into(), Some(nulls)))) + } + + fn state(&mut self, emit_to: datafusion_expr::EmitTo) -> Result> { + self.variance.state(emit_to) + } + + fn size(&self) -> usize { + self.variance.size() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/functions-aggregate/src/variance.rs b/datafusion/functions-aggregate/src/variance.rs index 4c78a42ea494..f5f2d06e3837 100644 --- a/datafusion/functions-aggregate/src/variance.rs +++ b/datafusion/functions-aggregate/src/variance.rs @@ -18,10 +18,11 @@ //! [`VarianceSample`]: variance sample aggregations. //! [`VariancePopulation`]: variance population aggregations. -use std::fmt::Debug; +use std::{fmt::Debug, sync::Arc}; use arrow::{ - array::{ArrayRef, Float64Array, UInt64Array}, + array::{Array, ArrayRef, BooleanArray, Float64Array, UInt64Array}, + buffer::NullBuffer, compute::kernels::cast, datatypes::{DataType, Field}, }; @@ -32,9 +33,11 @@ use datafusion_common::{ use datafusion_expr::{ function::{AccumulatorArgs, StateFieldsArgs}, utils::format_state_name, - Accumulator, AggregateUDFImpl, Signature, Volatility, + Accumulator, AggregateUDFImpl, GroupsAccumulator, Signature, Volatility, +}; +use datafusion_functions_aggregate_common::{ + aggregate::groups_accumulator::accumulate::accumulate, stats::StatsType, }; -use datafusion_functions_aggregate_common::stats::StatsType; make_udaf_expr_and_func!( VarianceSample, @@ -122,6 +125,17 @@ impl AggregateUDFImpl for VarianceSample { fn aliases(&self) -> &[String] { &self.aliases } + + fn groups_accumulator_supported(&self, acc_args: AccumulatorArgs) -> bool { + !acc_args.is_distinct + } + + fn create_groups_accumulator( + &self, + _args: AccumulatorArgs, + ) -> Result> { + Ok(Box::new(VarianceGroupsAccumulator::new(StatsType::Sample))) + } } pub struct VariancePopulation { @@ -196,6 +210,19 @@ impl AggregateUDFImpl for VariancePopulation { fn aliases(&self) -> &[String] { &self.aliases } + + fn groups_accumulator_supported(&self, acc_args: AccumulatorArgs) -> bool { + !acc_args.is_distinct + } + + fn create_groups_accumulator( + &self, + _args: AccumulatorArgs, + ) -> Result> { + Ok(Box::new(VarianceGroupsAccumulator::new( + StatsType::Population, + ))) + } } /// An accumulator to compute variance @@ -239,6 +266,36 @@ impl VarianceAccumulator { } } +#[inline] +fn merge( + count: u64, + mean: f64, + m2: f64, + count2: u64, + mean2: f64, + m22: f64, +) -> (u64, f64, f64) { + let new_count = count + count2; + let new_mean = + mean * count as f64 / new_count as f64 + mean2 * count2 as f64 / new_count as f64; + let delta = mean - mean2; + let new_m2 = + m2 + m22 + delta * delta * count as f64 * count2 as f64 / new_count as f64; + + (new_count, new_mean, new_m2) +} + +#[inline] +fn update(count: u64, mean: f64, m2: f64, value: f64) -> (u64, f64, f64) { + let new_count = count + 1; + let delta1 = value - mean; + let new_mean = delta1 / new_count as f64 + mean; + let delta2 = value - new_mean; + let new_m2 = m2 + delta1 * delta2; + + (new_count, new_mean, new_m2) +} + impl Accumulator for VarianceAccumulator { fn state(&mut self) -> Result> { Ok(vec![ @@ -253,15 +310,8 @@ impl Accumulator for VarianceAccumulator { let arr = downcast_value!(values, Float64Array).iter().flatten(); for value in arr { - let new_count = self.count + 1; - let delta1 = value - self.mean; - let new_mean = delta1 / new_count as f64 + self.mean; - let delta2 = value - new_mean; - let new_m2 = self.m2 + delta1 * delta2; - - self.count += 1; - self.mean = new_mean; - self.m2 = new_m2; + (self.count, self.mean, self.m2) = + update(self.count, self.mean, self.m2, value) } Ok(()) @@ -296,17 +346,14 @@ impl Accumulator for VarianceAccumulator { if c == 0_u64 { continue; } - let new_count = self.count + c; - let new_mean = self.mean * self.count as f64 / new_count as f64 - + means.value(i) * c as f64 / new_count as f64; - let delta = self.mean - means.value(i); - let new_m2 = self.m2 - + m2s.value(i) - + delta * delta * self.count as f64 * c as f64 / new_count as f64; - - self.count = new_count; - self.mean = new_mean; - self.m2 = new_m2; + (self.count, self.mean, self.m2) = merge( + self.count, + self.mean, + self.m2, + c, + means.value(i), + m2s.value(i), + ) } Ok(()) } @@ -344,3 +391,183 @@ impl Accumulator for VarianceAccumulator { true } } + +#[derive(Debug)] +pub struct VarianceGroupsAccumulator { + m2s: Vec, + means: Vec, + counts: Vec, + stats_type: StatsType, +} + +impl VarianceGroupsAccumulator { + pub fn new(s_type: StatsType) -> Self { + Self { + m2s: Vec::new(), + means: Vec::new(), + counts: Vec::new(), + stats_type: s_type, + } + } + + fn resize(&mut self, total_num_groups: usize) { + self.m2s.resize(total_num_groups, 0.0); + self.means.resize(total_num_groups, 0.0); + self.counts.resize(total_num_groups, 0); + } + + fn merge( + group_indices: &[usize], + counts: &UInt64Array, + means: &Float64Array, + m2s: &Float64Array, + opt_filter: Option<&BooleanArray>, + mut value_fn: F, + ) where + F: FnMut(usize, u64, f64, f64) + Send, + { + assert_eq!(counts.null_count(), 0); + assert_eq!(means.null_count(), 0); + assert_eq!(m2s.null_count(), 0); + + match opt_filter { + None => { + group_indices + .iter() + .zip(counts.values().iter()) + .zip(means.values().iter()) + .zip(m2s.values().iter()) + .for_each(|(((&group_index, &count), &mean), &m2)| { + value_fn(group_index, count, mean, m2); + }); + } + Some(filter) => { + group_indices + .iter() + .zip(counts.values().iter()) + .zip(means.values().iter()) + .zip(m2s.values().iter()) + .zip(filter.iter()) + .for_each( + |((((&group_index, &count), &mean), &m2), filter_value)| { + if let Some(true) = filter_value { + value_fn(group_index, count, mean, m2); + } + }, + ); + } + } + } + + pub fn variance( + &mut self, + emit_to: datafusion_expr::EmitTo, + ) -> (Vec, NullBuffer) { + let mut counts = emit_to.take_needed(&mut self.counts); + // means are only needed for updating m2s and are not needed for the final result. + // But we still need to take them to ensure the internal state is consistent. + let _ = emit_to.take_needed(&mut self.means); + let m2s = emit_to.take_needed(&mut self.m2s); + + if let StatsType::Sample = self.stats_type { + counts.iter_mut().for_each(|count| { + *count -= 1; + }); + } + let nulls = NullBuffer::from_iter(counts.iter().map(|&count| count != 0)); + let variance = m2s + .iter() + .zip(counts) + .map(|(m2, count)| m2 / count as f64) + .collect(); + (variance, nulls) + } +} + +impl GroupsAccumulator for VarianceGroupsAccumulator { + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&arrow::array::BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 1, "single argument to update_batch"); + let values = &cast(&values[0], &DataType::Float64)?; + let values = downcast_value!(values, Float64Array); + + self.resize(total_num_groups); + accumulate(group_indices, values, opt_filter, |group_index, value| { + let (new_count, new_mean, new_m2) = update( + self.counts[group_index], + self.means[group_index], + self.m2s[group_index], + value, + ); + self.counts[group_index] = new_count; + self.means[group_index] = new_mean; + self.m2s[group_index] = new_m2; + }); + Ok(()) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&arrow::array::BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 3, "two arguments to merge_batch"); + // first batch is counts, second is partial means, third is partial m2s + let partial_counts = downcast_value!(values[0], UInt64Array); + let partial_means = downcast_value!(values[1], Float64Array); + let partial_m2s = downcast_value!(values[2], Float64Array); + + self.resize(total_num_groups); + Self::merge( + group_indices, + partial_counts, + partial_means, + partial_m2s, + opt_filter, + |group_index, partial_count, partial_mean, partial_m2| { + let (new_count, new_mean, new_m2) = merge( + self.counts[group_index], + self.means[group_index], + self.m2s[group_index], + partial_count, + partial_mean, + partial_m2, + ); + self.counts[group_index] = new_count; + self.means[group_index] = new_mean; + self.m2s[group_index] = new_m2; + }, + ); + Ok(()) + } + + fn evaluate(&mut self, emit_to: datafusion_expr::EmitTo) -> Result { + let (variances, nulls) = self.variance(emit_to); + Ok(Arc::new(Float64Array::new(variances.into(), Some(nulls)))) + } + + fn state(&mut self, emit_to: datafusion_expr::EmitTo) -> Result> { + let counts = emit_to.take_needed(&mut self.counts); + let means = emit_to.take_needed(&mut self.means); + let m2s = emit_to.take_needed(&mut self.m2s); + + Ok(vec![ + Arc::new(UInt64Array::new(counts.into(), None)), + Arc::new(Float64Array::new(means.into(), None)), + Arc::new(Float64Array::new(m2s.into(), None)), + ]) + } + + fn size(&self) -> usize { + self.m2s.capacity() * std::mem::size_of::() + + self.means.capacity() * std::mem::size_of::() + + self.counts.capacity() * std::mem::size_of::() + } +} diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index b826dd31f4d1..45cb4d4615d7 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -511,6 +511,85 @@ select stddev(sq.column1) from (values (1.1), (2.0), (3.0)) as sq ---- 0.950438495292 +# csv_query_stddev_7 +query IR +SELECT c2, stddev_samp(c12) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2 +---- +1 0.303641032262 +2 0.284581967411 +3 0.296002660506 +4 0.284324609109 +5 0.331034486752 + +# csv_query_stddev_8 +query IR +SELECT c2, stddev_pop(c12) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2 +---- +1 0.296659845456 +2 0.278038978602 +3 0.288107833475 +4 0.278074953424 +5 0.318992813225 + +# csv_query_stddev_9 +query IR +SELECT c2, var_pop(c12) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2 +---- +1 0.088007063906 +2 0.077305673622 +3 0.083006123709 +4 0.077325679722 +5 0.101756414889 + +# csv_query_stddev_10 +query IR +SELECT c2, var_samp(c12) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2 +---- +1 0.092197876473 +2 0.080986896176 +3 0.087617575027 +4 0.080840483345 +5 0.109583831419 + +# csv_query_stddev_11 +query IR +SELECT c2, var_samp(c12) FROM aggregate_test_100 WHERE c12 > 0.90 GROUP BY c2 ORDER BY c2 +---- +1 0.000889240174 +2 0.000785878272 +3 NULL +4 NULL +5 0.000269544643 + +# Use PostgresSQL dialect +statement ok +set datafusion.sql_parser.dialect = 'Postgres'; + +# csv_query_stddev_12 +query IR +SELECT c2, var_samp(c12) FILTER (WHERE c12 > 0.90) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2 +---- +1 0.000889240174 +2 0.000785878272 +3 NULL +4 NULL +5 0.000269544643 + +# Restore the default dialect +statement ok +set datafusion.sql_parser.dialect = 'Generic'; + +# csv_query_stddev_13 +query IR +SELECT c2, var_samp(CASE WHEN c12 > 0.90 THEN c12 ELSE null END) FROM aggregate_test_100 GROUP BY c2 ORDER BY c2 +---- +1 0.000889240174 +2 0.000785878272 +3 NULL +4 NULL +5 0.000269544643 + + # csv_query_approx_median_1 query I SELECT approx_median(c2) FROM aggregate_test_100