Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Histogram aggregation #1306

Merged
merged 14 commits into from
Mar 18, 2022
7 changes: 7 additions & 0 deletions src/aggregation/agg_req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use std::collections::{HashMap, HashSet};

use serde::{Deserialize, Serialize};

use super::bucket::HistogramAggregation;
pub use super::bucket::RangeAggregation;
use super::metric::{AverageAggregation, StatsAggregation};

Expand Down Expand Up @@ -123,12 +124,18 @@ pub enum BucketAggregationType {
/// Put data into buckets of user-defined ranges.
#[serde(rename = "range")]
Range(RangeAggregation),
/// Put data into buckets of user-defined ranges.
#[serde(rename = "histogram")]
Histogram(HistogramAggregation),
}

impl BucketAggregationType {
fn get_fast_field_names(&self, fast_field_names: &mut HashSet<String>) {
match self {
BucketAggregationType::Range(range) => fast_field_names.insert(range.field.to_string()),
BucketAggregationType::Histogram(histogram) => {
fast_field_names.insert(histogram.field.to_string())
}
};
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/aggregation/agg_req_with_accessor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! This will enhance the request tree with access to the fastfield and metadata.

use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation};
use super::bucket::RangeAggregation;
use super::bucket::{HistogramAggregation, RangeAggregation};
use super::metric::{AverageAggregation, StatsAggregation};
use super::VecWithNames;
use crate::fastfield::{type_and_cardinality, DynamicFastFieldReader, FastType};
Expand Down Expand Up @@ -48,6 +48,9 @@ impl BucketAggregationWithAccessor {
field: field_name,
ranges: _,
}) => get_ff_reader_and_validate(reader, field_name)?,
BucketAggregationType::Histogram(HistogramAggregation {
field: field_name, ..
}) => get_ff_reader_and_validate(reader, field_name)?,
};
let sub_aggregation = sub_aggregation.clone();
Ok(BucketAggregationWithAccessor {
Expand Down
130 changes: 112 additions & 18 deletions src/aggregation/agg_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,31 @@ use std::collections::HashMap;
use itertools::Itertools;
use serde::{Deserialize, Serialize};

use super::bucket::generate_buckets;
use super::intermediate_agg_result::{
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry,
IntermediateMetricResult, IntermediateRangeBucketEntry,
};
use super::metric::{SingleMetricResult, Stats};
use super::Key;

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
/// The final aggegation result.
pub struct AggregationResults(pub HashMap<String, AggregationResult>);

impl From<IntermediateAggregationResults> for AggregationResults {
fn from(tree: IntermediateAggregationResults) -> Self {
Self(
tree.0
tree.buckets
.unwrap_or_default()
.into_iter()
.map(|(key, agg)| (key, agg.into()))
.map(|(key, bucket)| (key, AggregationResult::BucketResult(bucket.into())))
.chain(
tree.metrics
.unwrap_or_default()
.into_iter()
.map(|(key, metric)| (key, AggregationResult::MetricResult(metric.into()))),
)
.collect(),
)
}
Expand All @@ -41,18 +49,6 @@ pub enum AggregationResult {
/// Metric result variant.
MetricResult(MetricResult),
}
impl From<IntermediateAggregationResult> for AggregationResult {
fn from(tree: IntermediateAggregationResult) -> Self {
match tree {
IntermediateAggregationResult::Bucket(bucket) => {
AggregationResult::BucketResult(bucket.into())
}
IntermediateAggregationResult::Metric(metric) => {
AggregationResult::MetricResult(metric.into())
}
}
}
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]
Expand Down Expand Up @@ -81,12 +77,22 @@ impl From<IntermediateMetricResult> for MetricResult {
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum BucketResult {
/// This is the default entry for a bucket, which contains a key, count, and optionally
/// This is the range entry for a bucket, which contains a key, count, from, to, and optionally
/// sub_aggregations.
Range {
/// The range buckets sorted by range.
buckets: Vec<RangeBucketEntry>,
},
/// This is the histogram entry for a bucket, which contains a key, count, and optionally
/// sub_aggregations.
Histogram {
/// The buckets.
///
/// If there are holes depends on the request, if min_doc_count is 0, then there are no
/// holes between the first and last bucket.
/// See [HistogramAggregation](super::bucket::HistogramAggregation)
buckets: Vec<BucketEntry>,
fulmicoton marked this conversation as resolved.
Show resolved Hide resolved
},
}

impl From<IntermediateBucketResult> for BucketResult {
Expand All @@ -106,6 +112,94 @@ impl From<IntermediateBucketResult> for BucketResult {
});
BucketResult::Range { buckets }
}
IntermediateBucketResult::Histogram { buckets, req } => {
let buckets = if req.min_doc_count() == 0 {
// We need to fill up the buckets for the total ranges, so that there are no
// gaps
let minmax = buckets.iter().minmax_by_key(|bucket| bucket.key);
let all_buckets = match minmax {
itertools::MinMaxResult::MinMax(min, max) => {
fulmicoton marked this conversation as resolved.
Show resolved Hide resolved
generate_buckets(&req, min.key, max.key)
}
_ => vec![],
};

buckets
.into_iter()
.merge_join_by(all_buckets.into_iter(), |existing_bucket, all_bucket| {
existing_bucket
.key
.partial_cmp(all_bucket)
.unwrap_or(Ordering::Equal)
})
.map(|either| match either {
itertools::EitherOrBoth::Both(existing, _) => existing.into(),
itertools::EitherOrBoth::Left(existing) => existing.into(),
// Add missing bucket
itertools::EitherOrBoth::Right(bucket) => BucketEntry {
key: Key::F64(bucket),
doc_count: 0,
sub_aggregation: Default::default(),
},
})
.collect_vec()
} else {
buckets
.into_iter()
.filter(|bucket| bucket.doc_count >= req.min_doc_count())
.map(|bucket| bucket.into())
.collect_vec()
};

BucketResult::Histogram { buckets }
}
}
}
}

/// This is the default entry for a bucket, which contains a key, count, and optionally
/// sub_aggregations.
///
/// # JSON Format
/// ```json
/// {
/// ...
/// "my_histogram": {
/// "buckets": [
/// {
/// "key": "2.0",
/// "doc_count": 5
/// },
/// {
/// "key": "4.0",
/// "doc_count": 2
/// },
/// {
/// "key": "6.0",
/// "doc_count": 3
/// }
/// ]
/// }
/// ...
/// }
/// ```
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct BucketEntry {
/// The identifier of the bucket.
pub key: Key,
/// Number of documents in the bucket.
pub doc_count: u64,
#[serde(flatten)]
/// sub-aggregations in this bucket.
pub sub_aggregation: AggregationResults,
}

impl From<IntermediateHistogramBucketEntry> for BucketEntry {
fn from(entry: IntermediateHistogramBucketEntry) -> Self {
BucketEntry {
key: Key::F64(entry.key),
doc_count: entry.doc_count,
sub_aggregation: entry.sub_aggregation.into(),
}
}
}
Expand Down Expand Up @@ -139,7 +233,7 @@ impl From<IntermediateBucketResult> for BucketResult {
/// }
/// ...
/// }
/// ```
/// ```
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct RangeBucketEntry {
/// The identifier of the bucket.
Expand Down
Loading