Skip to content

Commit

Permalink
Add value_count mode to rate agg (#63687) (#63847)
Browse files Browse the repository at this point in the history
Adds a new value count mode to the rate aggregation.

Closes #63575
  • Loading branch information
imotov authored Oct 19, 2020
1 parent 1fa232e commit 5ebe90d
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 29 deletions.
82 changes: 80 additions & 2 deletions docs/reference/aggregations/metrics/rate-aggregation.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ be automatically calculated by multiplying monthly rate by 12.
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]

Instead of counting the number of documents, it is also possible to calculate a sum of all values of the fields in the documents in each
bucket. The following request will group all sales records into monthly bucket and than calculate the total monthly sales and convert them
into average daily sales.
bucket or the number of values in each bucket. The following request will group all sales records into monthly bucket and than calculate
the total monthly sales and convert them into average daily sales.

[source,console]
--------------------------------------------------
Expand Down Expand Up @@ -164,6 +164,84 @@ The response will contain the average daily sale prices for each month.
--------------------------------------------------
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]

By adding the `mode` parameter with the value `value_count`, we can change the calculation from `sum` to the number of values of the field:

[source,console]
--------------------------------------------------
GET sales/_search
{
"size": 0,
"aggs": {
"by_date": {
"date_histogram": {
"field": "date",
"calendar_interval": "month" <1>
},
"aggs": {
"avg_number_of_sales_per_year": {
"rate": {
"field": "price", <2>
"unit": "year", <3>
"mode": "value_count" <4>
}
}
}
}
}
}
--------------------------------------------------
// TEST[setup:sales]
<1> Histogram is grouped by month.
<2> Calculate number of of all sale prices
<3> Convert to annual counts
<4> Changing the mode to value count

The response will contain the average daily sale prices for each month.

[source,console-result]
--------------------------------------------------
{
...
"aggregations" : {
"by_date" : {
"buckets" : [
{
"key_as_string" : "2015/01/01 00:00:00",
"key" : 1420070400000,
"doc_count" : 3,
"avg_number_of_sales_per_year" : {
"value" : 36.0
}
},
{
"key_as_string" : "2015/02/01 00:00:00",
"key" : 1422748800000,
"doc_count" : 2,
"avg_number_of_sales_per_year" : {
"value" : 24.0
}
},
{
"key_as_string" : "2015/03/01 00:00:00",
"key" : 1425168000000,
"doc_count" : 2,
"avg_number_of_sales_per_year" : {
"value" : 24.0
}
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]

By default `sum` mode is used.

`"mode": "sum"`:: calculate the sum of all values field
`"mode": "value_count"`:: use the number of values in the field

The `mode` parameter can only be used with fields and scripts.

==== Relationship between bucket sizes and rate

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public abstract class AbstractRateAggregator extends NumericMetricsAggregator.Si
protected final ValuesSource valuesSource;
private final DocValueFormat format;
private final Rounding.DateTimeUnit rateUnit;
protected final RateMode rateMode;
private final SizedBucketAggregator sizedBucketAggregator;

protected DoubleArray sums;
Expand All @@ -35,6 +36,7 @@ public AbstractRateAggregator(
String name,
ValuesSourceConfig valuesSourceConfig,
Rounding.DateTimeUnit rateUnit,
RateMode rateMode,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
Expand All @@ -45,8 +47,12 @@ public AbstractRateAggregator(
if (valuesSource != null) {
sums = context.bigArrays().newDoubleArray(1, true);
compensations = context.bigArrays().newDoubleArray(1, true);
if (rateMode == null) {
rateMode = RateMode.SUM;
}
}
this.rateUnit = rateUnit;
this.rateMode = rateMode;
this.sizedBucketAggregator = findSizedBucketAncestor();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ public HistogramRateAggregator(
String name,
ValuesSourceConfig valuesSourceConfig,
Rounding.DateTimeUnit rateUnit,
RateMode rateMode,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
) throws IOException {
super(name, valuesSourceConfig, rateUnit, context, parent, metadata);
super(name, valuesSourceConfig, rateUnit, rateMode, context, parent, metadata);
}

@Override
Expand All @@ -51,7 +52,18 @@ public void collect(int doc, long bucket) throws IOException {
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);
kahanSummation.add(sketch.value());
final double value;
switch (rateMode) {
case SUM:
value = sketch.value();
break;
case VALUE_COUNT:
value = sketch.count();
break;
default:
throw new IllegalArgumentException("Unsupported rate mode " + rateMode);
}
kahanSummation.add(value);
compensations.set(bucket, kahanSummation.delta());
sums.set(bucket, kahanSummation.value());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ public NumericRateAggregator(
String name,
ValuesSourceConfig valuesSourceConfig,
Rounding.DateTimeUnit rateUnit,
RateMode rateMode,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
) throws IOException {
super(name, valuesSourceConfig, rateUnit, context, parent, metadata);
super(name, valuesSourceConfig, rateUnit, rateMode, context, parent, metadata);
}

@Override
Expand All @@ -51,10 +52,17 @@ public void collect(int doc, long bucket) throws IOException {
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);

for (int i = 0; i < valuesCount; i++) {
double value = values.nextValue();
kahanSummation.add(value);
switch (rateMode) {
case SUM:
for (int i = 0; i < valuesCount; i++) {
kahanSummation.add(values.nextValue());
}
break;
case VALUE_COUNT:
kahanSummation.add(valuesCount);
break;
default:
throw new IllegalArgumentException("Unsupported rate mode " + rateMode);
}

compensations.set(bucket, kahanSummation.delta());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
*/
package org.elasticsearch.xpack.analytics.rate;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -24,13 +29,10 @@
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

public class RateAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource, RateAggregationBuilder> {
public static final String NAME = "rate";
public static final ParseField UNIT_FIELD = new ParseField("unit");
public static final ParseField MODE_FIELD = new ParseField("mode");
public static final ValuesSourceRegistry.RegistryKey<RateAggregatorSupplier> REGISTRY_KEY = new ValuesSourceRegistry.RegistryKey<>(
NAME,
RateAggregatorSupplier.class
Expand All @@ -40,9 +42,11 @@ public class RateAggregationBuilder extends ValuesSourceAggregationBuilder.LeafO
static {
ValuesSourceAggregationBuilder.declareFields(PARSER, true, true, false, false);
PARSER.declareString(RateAggregationBuilder::rateUnit, UNIT_FIELD);
PARSER.declareString(RateAggregationBuilder::rateMode, MODE_FIELD);
}

Rounding.DateTimeUnit rateUnit;
RateMode rateMode;

public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
RateAggregatorFactory.registerAggregators(builder);
Expand All @@ -58,6 +62,8 @@ protected RateAggregationBuilder(
Map<String, Object> metadata
) {
super(clone, factoriesBuilder, metadata);
this.rateUnit = clone.rateUnit;
this.rateMode = clone.rateMode;
}

@Override
Expand All @@ -76,6 +82,11 @@ public RateAggregationBuilder(StreamInput in) throws IOException {
} else {
rateUnit = null;
}
if (in.getVersion().onOrAfter(Version.V_7_11_0)) {
if (in.readBoolean()) {
rateMode = in.readEnum(RateMode.class);
}
}
}

@Override
Expand All @@ -90,6 +101,14 @@ protected void innerWriteTo(StreamOutput out) throws IOException {
} else {
out.writeByte((byte) 0);
}
if (out.getVersion().onOrAfter(Version.V_7_11_0)) {
if (rateMode != null) {
out.writeBoolean(true);
out.writeEnum(rateMode);
} else {
out.writeBoolean(false);
}
}
}

@Override
Expand All @@ -104,14 +123,22 @@ protected RateAggregatorFactory innerBuild(
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder
) throws IOException {
return new RateAggregatorFactory(name, config, rateUnit, context, parent, subFactoriesBuilder, metadata);
if (field() == null && script() == null) {
if (rateMode != null) {
throw new IllegalArgumentException("The mode parameter is only supported with field or script");
}
}
return new RateAggregatorFactory(name, config, rateUnit, rateMode, context, parent, subFactoriesBuilder, metadata);
}

@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
if (rateUnit != null) {
builder.field(UNIT_FIELD.getPreferredName(), rateUnit.shortName());
}
if (rateMode != null) {
builder.field(MODE_FIELD.getPreferredName(), rateMode.value());
}
return builder;
}

Expand All @@ -129,6 +156,15 @@ public RateAggregationBuilder rateUnit(Rounding.DateTimeUnit rateUnit) {
return this;
}

public RateAggregationBuilder rateMode(String rateMode) {
return rateMode(RateMode.resolve(rateMode));
}

public RateAggregationBuilder rateMode(RateMode rateMode) {
this.rateMode = rateMode;
return this;
}

static Rounding.DateTimeUnit parse(String rateUnit) {
Rounding.DateTimeUnit parsedRate = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(rateUnit);
if (parsedRate == null) {
Expand All @@ -140,17 +176,7 @@ static Rounding.DateTimeUnit parse(String rateUnit) {
@Override
protected ValuesSourceConfig resolveConfig(AggregationContext context) {
if (field() == null && script() == null) {
return new ValuesSourceConfig(
CoreValuesSourceType.NUMERIC,
null,
true,
null,
null,
1.0,
null,
DocValueFormat.RAW,
context
);
return new ValuesSourceConfig(CoreValuesSourceType.NUMERIC, null, true, null, null, 1.0, null, DocValueFormat.RAW, context);
} else {
return super.resolveConfig(context);
}
Expand All @@ -162,11 +188,11 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
RateAggregationBuilder that = (RateAggregationBuilder) o;
return rateUnit == that.rateUnit;
return rateUnit == that.rateUnit && rateMode == that.rateMode;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), rateUnit);
return Objects.hash(super.hashCode(), rateUnit, rateMode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,21 @@ class RateAggregatorFactory extends ValuesSourceAggregatorFactory {

private final Rounding.DateTimeUnit rateUnit;

private final RateMode rateMode;

RateAggregatorFactory(
String name,
ValuesSourceConfig config,
Rounding.DateTimeUnit rateUnit,
RateMode rateMode,
AggregationContext context,
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metadata
) throws IOException {
super(name, config, context, parent, subFactoriesBuilder, metadata);
this.rateUnit = rateUnit;
this.rateMode = rateMode;
}

static void registerAggregators(ValuesSourceRegistry.Builder builder) {
Expand All @@ -60,7 +64,7 @@ static void registerAggregators(ValuesSourceRegistry.Builder builder) {

@Override
protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map<String, Object> metadata) throws IOException {
return new AbstractRateAggregator(name, config, rateUnit, searchContext, parent, metadata) {
return new AbstractRateAggregator(name, config, rateUnit, rateMode, searchContext, parent, metadata) {
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) {
return LeafBucketCollector.NO_OP_COLLECTOR;
Expand All @@ -77,6 +81,6 @@ protected Aggregator doCreateInternal(
) throws IOException {
return context.getValuesSourceRegistry()
.getAggregator(RateAggregationBuilder.REGISTRY_KEY, config)
.build(name, config, rateUnit, searchContext, parent, metadata);
.build(name, config, rateUnit, rateMode, searchContext, parent, metadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Aggregator build(
String name,
ValuesSourceConfig valuesSourceConfig,
Rounding.DateTimeUnit rateUnit,
RateMode rateMode,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
Expand Down
Loading

0 comments on commit 5ebe90d

Please sign in to comment.