From fa2c817adc5e583b7054c451748720a79e59101f Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Mon, 7 Oct 2019 18:49:29 -0400 Subject: [PATCH 1/3] Add BKD Optimization to Range aggregation If the range aggregator is A) a top-level agg (no parent), B) matching all the documents (match_all query) and C) does not contain unbounded ranges, we can use the BKD tree to match documents faster than collecting all the documents via the normal DocValue method. If the aggregation meets the specific criteria above, the range agg will attempt to collect by BKD. There is an additional constraint that the estimated number of matching points should be < maxDocs*0.75. The BKD approach loses it's advantage over DV when most of the index is being aggregated into at least one range. As a side effect, this also introduces some new methods on NumberFieldMapper so that we can encode the ranges into the Point format without knowing about the underlying format (float, double, long, etc). From tests, this appears to be 70-90% faster latency in the optimized case without grossly affecting the un-optimized case. --- .../index/mapper/NumberFieldMapper.java | 105 +++++++ .../range/AbstractRangeAggregatorFactory.java | 2 +- .../GeoDistanceRangeAggregatorFactory.java | 5 +- .../bucket/range/RangeAggregator.java | 269 ++++++++++++++++-- .../aggregations/metrics/MinAggregator.java | 2 +- 5 files changed, 356 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java index 927bce5d9d6dd..ebed596c3890b 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java @@ -193,6 +193,19 @@ public Number parsePoint(byte[] value) { return HalfFloatPoint.decodeDimension(value, 0); } + @Override + public byte[] encodePoint(Number value, boolean coerce) { + float parsedValue = parse(value, coerce); + byte[] bytes = new byte[Integer.BYTES]; + HalfFloatPoint.encodeDimension(parsedValue, bytes, 0); + return bytes; + } + + @Override + public int bytesPerEncodedPoint() { + return Integer.BYTES; + } + @Override public Float parse(XContentParser parser, boolean coerce) throws IOException { float parsed = parser.floatValue(coerce); @@ -290,6 +303,19 @@ public Number parsePoint(byte[] value) { return FloatPoint.decodeDimension(value, 0); } + @Override + public byte[] encodePoint(Number value, boolean coerce) { + float parsedValue = parse(value, coerce); + byte[] bytes = new byte[Integer.BYTES]; + FloatPoint.encodeDimension(parsedValue, bytes, 0); + return bytes; + } + + @Override + public int bytesPerEncodedPoint() { + return Integer.BYTES; + } + @Override public Float parse(XContentParser parser, boolean coerce) throws IOException { float parsed = parser.floatValue(coerce); @@ -376,6 +402,19 @@ public Number parsePoint(byte[] value) { return DoublePoint.decodeDimension(value, 0); } + @Override + public byte[] encodePoint(Number value, boolean coerce) { + double parsedValue = parse(value, coerce); + byte[] bytes = new byte[Long.BYTES]; + DoublePoint.encodeDimension(parsedValue, bytes, 0); + return bytes; + } + + @Override + public int bytesPerEncodedPoint() { + return Long.BYTES; + } + @Override public Double parse(XContentParser parser, boolean coerce) throws IOException { double parsed = parser.doubleValue(coerce); @@ -473,6 +512,21 @@ public Number parsePoint(byte[] value) { return INTEGER.parsePoint(value).byteValue(); } + @Override + public byte[] encodePoint(Number value, boolean coerce) { + int parsedValue = parse(value, coerce); + + // Same as integer + byte[] bytes = new byte[Integer.BYTES]; + IntPoint.encodeDimension(parsedValue, bytes, 0); + return bytes; + } + + @Override + public int bytesPerEncodedPoint() { + return Integer.BYTES; + } + @Override public Short parse(XContentParser parser, boolean coerce) throws IOException { int value = parser.intValue(coerce); @@ -534,6 +588,21 @@ public Number parsePoint(byte[] value) { return INTEGER.parsePoint(value).shortValue(); } + @Override + public byte[] encodePoint(Number value, boolean coerce) { + int parsedValue = parse(value, coerce); + + // Same as integer + byte[] bytes = new byte[Integer.BYTES]; + IntPoint.encodeDimension(parsedValue, bytes, 0); + return bytes; + } + + @Override + public int bytesPerEncodedPoint() { + return Integer.BYTES; + } + @Override public Short parse(XContentParser parser, boolean coerce) throws IOException { return parser.shortValue(coerce); @@ -591,6 +660,19 @@ public Number parsePoint(byte[] value) { return IntPoint.decodeDimension(value, 0); } + @Override + public byte[] encodePoint(Number value, boolean coerce) { + int parsedValue = parse(value, coerce); + byte[] bytes = new byte[Integer.BYTES]; + IntPoint.encodeDimension(parsedValue, bytes, 0); + return bytes; + } + + @Override + public int bytesPerEncodedPoint() { + return Integer.BYTES; + } + @Override public Integer parse(XContentParser parser, boolean coerce) throws IOException { return parser.intValue(coerce); @@ -710,6 +792,19 @@ public Number parsePoint(byte[] value) { return LongPoint.decodeDimension(value, 0); } + @Override + public byte[] encodePoint(Number value, boolean coerce) { + long parsedValue = parse(value, coerce); + byte[] bytes = new byte[Long.BYTES]; + LongPoint.encodeDimension(parsedValue, bytes, 0); + return bytes; + } + + @Override + public int bytesPerEncodedPoint() { + return Long.BYTES; + } + @Override public Long parse(XContentParser parser, boolean coerce) throws IOException { return parser.longValue(coerce); @@ -827,6 +922,8 @@ public abstract Query rangeQuery(String field, Object lowerTerm, Object upperTer public abstract Number parse(XContentParser parser, boolean coerce) throws IOException; public abstract Number parse(Object value, boolean coerce); public abstract Number parsePoint(byte[] value); + public abstract byte[] encodePoint(Number value, boolean coerce); + public abstract int bytesPerEncodedPoint(); public abstract List createFields(String name, Number value, boolean indexed, boolean docValued, boolean stored); Number valueForSearch(Number value) { @@ -979,6 +1076,14 @@ public Number parsePoint(byte[] value) { return type.parsePoint(value); } + public byte[] encodePoint(Number value, boolean coerce) { + return type.encodePoint(value, coerce); + } + + public int bytesPerEncodedPoint() { + return type.bytesPerEncodedPoint(); + } + @Override public boolean equals(Object o) { if (super.equals(o) == false) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java index d60851a2d7fef..de886acdad127 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java @@ -72,7 +72,7 @@ protected Aggregator doCreateInternal(Numeric valuesSource, boolean collectsFromSingleBucket, List pipelineAggregators, Map metaData) throws IOException { - return new RangeAggregator(name, factories, valuesSource, config.format(), rangeFactory, ranges, keyed, searchContext, parent, + return new RangeAggregator(name, factories, valuesSource, config, rangeFactory, ranges, keyed, searchContext, parent, pipelineAggregators, metaData); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/GeoDistanceRangeAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/GeoDistanceRangeAggregatorFactory.java index 711297762b8d4..ee38298124e99 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/GeoDistanceRangeAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/GeoDistanceRangeAggregatorFactory.java @@ -81,9 +81,8 @@ protected Aggregator doCreateInternal(final ValuesSource.GeoPoint valuesSource, List pipelineAggregators, Map metaData) throws IOException { DistanceSource distanceSource = new DistanceSource(valuesSource, distanceType, origin, unit); - return new RangeAggregator(name, factories, distanceSource, config.format(), rangeFactory, ranges, keyed, searchContext, - parent, - pipelineAggregators, metaData); + return new RangeAggregator(name, factories, distanceSource, config, rangeFactory, ranges, keyed, searchContext, + parent, pipelineAggregators, metaData); } private static class DistanceSource extends ValuesSource.Numeric { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java index c4e2d1fc4394e..434e1686d81ff 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java @@ -18,8 +18,15 @@ */ package org.elasticsearch.search.aggregations.bucket.range; +import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.PointValues; +import org.apache.lucene.search.CollectionTerminatedException; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.DocIdSetBuilder; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -29,6 +36,9 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParserUtils; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; @@ -40,6 +50,7 @@ import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; @@ -47,6 +58,9 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.BiFunction; + +import static org.apache.lucene.util.FutureArrays.compareUnsigned; public class RangeAggregator extends BucketsAggregator { @@ -217,33 +231,59 @@ public boolean equals(Object obj) { } } - final ValuesSource.Numeric valuesSource; + private final ValuesSource.Numeric valuesSource; final DocValueFormat format; final Range[] ranges; + final boolean keyed; - final InternalRange.Factory rangeFactory; + private final InternalRange.Factory rangeFactory; + private final double[] maxTo; + + private BiFunction pointEncoder; + private final String pointField; + private final boolean canOptimize; + private byte[][] encodedRanges; - final double[] maxTo; - public RangeAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, DocValueFormat format, - InternalRange.Factory rangeFactory, Range[] ranges, boolean keyed, SearchContext context, - Aggregator parent, List pipelineAggregators, Map metaData) throws IOException { + public RangeAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, ValuesSourceConfig config, + InternalRange.Factory rangeFactory, Range[] ranges, boolean keyed, SearchContext context, Aggregator parent, + List pipelineAggregators, Map metaData) throws IOException { super(name, factories, context, parent, pipelineAggregators, metaData); assert valuesSource != null; this.valuesSource = valuesSource; - this.format = format; + this.format = config.format(); this.keyed = keyed; this.rangeFactory = rangeFactory; - this.ranges = ranges; + this.pointEncoder = configurePointEncoder(context, parent, config); + + // Unbounded ranges collect most documents, so the BKD optimization doesn't + // help nearly as much + boolean rangesAreBounded = Double.isFinite(ranges[0].from); - maxTo = new double[this.ranges.length]; - maxTo[0] = this.ranges[0].to; - for (int i = 1; i < this.ranges.length; ++i) { - maxTo[i] = Math.max(this.ranges[i].to,maxTo[i-1]); + maxTo = new double[ranges.length]; + maxTo[0] = ranges[0].to; + for (int i = 1; i < ranges.length; ++i) { + maxTo[i] = Math.max(ranges[i].to, maxTo[i-1]); + rangesAreBounded &= Double.isFinite(ranges[i].to); } + if (pointEncoder != null && rangesAreBounded) { + pointField = config.fieldContext().field(); + encodedRanges = new byte[ranges.length * 2][]; + for (int i = 0; i < ranges.length; i++) { + byte[] from = Double.isFinite(ranges[i].from) ? pointEncoder.apply(ranges[i].from, false) : null; + byte[] to = Double.isFinite(ranges[i].to) ? pointEncoder.apply(ranges[i].to, false) : null; + encodedRanges[i*2] = from; + encodedRanges[i*2 + 1] = to; + } + canOptimize = true; + } else { + pointField = null; + pointEncoder = null; + canOptimize = false; + } } @Override @@ -257,6 +297,24 @@ public ScoreMode scoreMode() { @Override public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { + + if (valuesSource == null) { + if (parent != null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } else { + // we have no parent and the values source is empty so we can skip collecting hits. + throw new CollectionTerminatedException(); + } + } + + if (canOptimize) { + // if we can optimize, and we decide the optimization is better than DV collection, + // this will use the BKD to collect hits and then throw a CollectionTerminatedException + tryBKDOptimization(ctx, sub); + } + + // We either cannot optimize, or have decided DVs would be faster so + // fall back to collecting all the values from DVs directly final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); return new LeafBucketCollectorBase(sub, values) { @Override @@ -265,12 +323,14 @@ public void collect(int doc, long bucket) throws IOException { final int valuesCount = values.docValueCount(); for (int i = 0, lo = 0; i < valuesCount; ++i) { final double value = values.nextValue(); - lo = collect(doc, value, bucket, lo); + lo = collectValue(doc, value, bucket, lo, sub); } } } + }; + } - private int collect(int doc, double value, long owningBucketOrdinal, int lowBound) throws IOException { + private int collectValue(int doc, double value, long owningBucketOrdinal, int lowBound, LeafBucketCollector sub) throws IOException { int lo = lowBound, hi = ranges.length - 1; // all candidates are between these indexes int mid = (lo + hi) >>> 1; while (lo <= hi) { @@ -312,15 +372,181 @@ private int collect(int doc, double value, long owningBucketOrdinal, int lowBoun for (int i = startLo; i <= endHi; ++i) { if (ranges[i].matches(value)) { - collectBucket(sub, doc, subBucketOrdinal(owningBucketOrdinal, i)); + collectBucket(sub, doc, subBucketOrdinal(owningBucketOrdinal, i)); } } return endHi + 1; } + + /** + * Attempt to collect these ranges via the BKD tree instead of DocValues. + * + * This estimates the number of matching points in the BKD tree. If it is + * less than 75% of maxDoc we attempt to use the BKD tree to collect values. + * The BKD tree is potentially much faster than DV collection because + * we only need to inspect leaves that overlap each range, rather than + * collecting all the values as with DVs. And since we only care about doc + * counts, we don't need to decode values when an entire leaf matches. + * + * If we use the BKD tree, when it is done collecting values a + * {@link CollectionTerminatedException} is thrown to signal completion + */ + private void tryBKDOptimization(LeafReaderContext ctx, LeafBucketCollector sub) throws CollectionTerminatedException, IOException { + final PointValues pointValues = ctx.reader().getPointValues(pointField); + if (pointValues != null) { + PointValues.IntersectVisitor[] visitors = new PointValues.IntersectVisitor[ranges.length]; + DocIdSetBuilder[] results = new DocIdSetBuilder[ranges.length]; + + final Bits liveDocs = ctx.reader().getLiveDocs(); + int maxDoc = ctx.reader().maxDoc(); + long estimatedPoints = 0; + for (int i = 0; i < ranges.length; i++) { + // OK to allocate DocIdSetBuilder now since it allocates memory lazily and the + // estimation won't call `grow()` on the visitor (only once we start intersecting) + results[i] = new DocIdSetBuilder(maxDoc); + visitors[i] = getVisitor(liveDocs, encodedRanges[i * 2], encodedRanges[i * 2 + 1], results[i]); + estimatedPoints += pointValues.estimatePointCount(visitors[i]); + } + + if (estimatedPoints < maxDoc * 0.75) { + // We collect ranges individually since a doc can land in multiple ranges. + for (int i = 0; i < ranges.length; i++) { + pointValues.intersect(visitors[i]); + DocIdSetIterator iter = results[i].build().iterator(); + while (iter.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + // Now that we know the matching docs, collect the bucket and sub-aggs + // + // NOTE: because we're in the BKD optimization, we know there is no parent agg + // and bucket ordinals are zero-based offset by range ordinal + collectBucket(sub, iter.docID(), i); + } + // free this DocIdSet since we no longer need it, and it could be holding + // non-negligible amount of memory + results[i] = null; + } + throw new CollectionTerminatedException(); + } + } + } + + /** + * Returns a BKD intersection visitor for the provided range (`from` inclusive, `to` exclusive) + */ + private PointValues.IntersectVisitor getVisitor(Bits liveDocs, byte[] from, byte[] to, DocIdSetBuilder result) { + + + return new PointValues.IntersectVisitor() { + DocIdSetBuilder.BulkAdder adder; + + @Override + public void grow(int count) { + adder = result.grow(count); + } + + @Override + public void visit(int docID) { + if ((liveDocs == null || liveDocs.get(docID))) { + adder.add(docID); + } + } + + @Override + public void visit(int docID, byte[] packedValue) { + int packedLength = packedValue.length; + + // Value is inside range if value >= from && value < to + boolean inside = (from == null || compareUnsigned(packedValue, 0, packedValue.length, from, 0, from.length) >= 0) + && (to == null || compareUnsigned(packedValue, 0, packedLength, to, 0, to.length) < 0); + + if (inside) { + visit(docID); + } + } + + @Override + public void visit(DocIdSetIterator iterator, byte[] packedValue) throws IOException { + int packedLength = packedValue.length; + + // Value is inside range if value >= from && value < to + boolean inside = (from == null || compareUnsigned(packedValue, 0, packedValue.length, from, 0, from.length) >= 0) + && (to == null || compareUnsigned(packedValue, 0, packedLength, to, 0, to.length) < 0); + + if (inside) { + while (iterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + visit(iterator.docID()); + } + } + } + + @Override + public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) { + + int packedLength = minPackedValue.length; + + // max < from (exclusive, since ranges are inclusive on from) + if (from != null && compareUnsigned(maxPackedValue, 0, packedLength, from, 0, from.length) < 0) { + return PointValues.Relation.CELL_OUTSIDE_QUERY; + } + // min >= from (inclusive, since ranges are exclusive on to) + if (to != null && compareUnsigned(minPackedValue, 0, packedLength, to, 0, to.length) >= 0) { + return PointValues.Relation.CELL_OUTSIDE_QUERY; + } + + // Leaf is fully inside this range if min >= from && max < to + if ( + // `from` is unbounded or `min >= from` + (from == null || compareUnsigned(minPackedValue, 0, packedLength, from, 0, from.length) >= 0) + && + // `to` is unbounded or `max < to` + (to == null || compareUnsigned(maxPackedValue, 0, packedLength, to, 0, to.length) < 0) + ) { + return PointValues.Relation.CELL_INSIDE_QUERY; + } + + // If we're not outside, and not fully inside, we must be crossing + return PointValues.Relation.CELL_CROSSES_QUERY; + + } }; } + /** + * Returns a converter for point values if BKD optimization is applicable to + * the context or null otherwise. Optimization criteria is: + * - Match_all query + * - no parent agg + * - no script + * - no missing value + * - has indexed points + * + * @param context The {@link SearchContext} of the aggregation. + * @param parent The parent aggregator. + * @param config The config for the values source metric. + */ + private BiFunction configurePointEncoder(SearchContext context, Aggregator parent, + ValuesSourceConfig config) { + if (context.query() != null && + context.query().getClass() != MatchAllDocsQuery.class) { + return null; + } + if (parent != null) { + return null; + } + if (config.fieldContext() != null && config.script() == null && config.missing() == null) { + MappedFieldType fieldType = config.fieldContext().fieldType(); + if (fieldType == null || fieldType.indexOptions() == IndexOptions.NONE) { + return null; + } + if (fieldType instanceof NumberFieldMapper.NumberFieldType) { + return ((NumberFieldMapper.NumberFieldType) fieldType)::encodePoint; + } else if (fieldType.getClass() == DateFieldMapper.DateFieldType.class) { + return NumberFieldMapper.NumberType.LONG::encodePoint; + } + } + return null; + } + private long subBucketOrdinal(long owningBucketOrdinal, int rangeOrd) { return owningBucketOrdinal * ranges.length + rangeOrd; } @@ -328,13 +554,13 @@ private long subBucketOrdinal(long owningBucketOrdinal, int rangeOrd) { @Override public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException { consumeBucketsAndMaybeBreak(ranges.length); - List buckets = new ArrayList<>(ranges.length); + List buckets = new ArrayList<>(ranges.length); for (int i = 0; i < ranges.length; i++) { Range range = ranges[i]; final long bucketOrd = subBucketOrdinal(owningBucketOrdinal, i); - org.elasticsearch.search.aggregations.bucket.range.Range.Bucket bucket = - rangeFactory.createBucket(range.key, range.from, range.to, bucketDocCount(bucketOrd), - bucketAggregations(bucketOrd), keyed, format); + InternalRange.Bucket bucket = + rangeFactory.createBucket(range.key, range.from, range.to, bucketDocCount(bucketOrd), + bucketAggregations(bucketOrd), keyed, format); buckets.add(bucket); } // value source can be null in the case of unmapped fields @@ -344,11 +570,10 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE @Override public InternalAggregation buildEmptyAggregation() { InternalAggregations subAggs = buildEmptySubAggregations(); - List buckets = new ArrayList<>(ranges.length); + List buckets = new ArrayList<>(ranges.length); for (int i = 0; i < ranges.length; i++) { Range range = ranges[i]; - org.elasticsearch.search.aggregations.bucket.range.Range.Bucket bucket = - rangeFactory.createBucket(range.key, range.from, range.to, 0, subAggs, keyed, format); + InternalRange.Bucket bucket = rangeFactory.createBucket(range.key, range.from, range.to, 0, subAggs, keyed, format); buckets.add(bucket); } // value source can be null in the case of unmapped fields diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MinAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MinAggregator.java index 0a40347a3cf4b..38841d7e4d809 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MinAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MinAggregator.java @@ -181,7 +181,7 @@ static Function getPointReaderOrNull(SearchContext context, Aggr if (parent != null) { return null; } - if (config.fieldContext() != null && config.script() == null) { + if (config.fieldContext() != null && config.script() == null && config.missing() == null) { MappedFieldType fieldType = config.fieldContext().fieldType(); if (fieldType == null || fieldType.indexOptions() == IndexOptions.NONE) { return null; From a5ed1e045699263c89df48a2d5ca4e2d0006d7f3 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Wed, 20 Nov 2019 15:36:08 -0500 Subject: [PATCH 2/3] WIP: Wrap sub-aggs with MultiBucketAggregatorWrapper --- .../aggregations/AggregatorFactories.java | 12 +++- .../aggregations/AggregatorFactory.java | 12 ++-- .../range/AbstractRangeAggregatorFactory.java | 66 +++++++++++++------ .../bucket/range/RangeAggregator.java | 4 +- .../search/aggregations/bucket/RangeIT.java | 55 ++++++++++++++++ 5 files changed, 119 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index 81611f2c8e1b2..d37251a22981e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -164,18 +164,26 @@ public AggParseContext(String name) { public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0], new ArrayList<>()); - private AggregatorFactory[] factories; + protected AggregatorFactory[] factories; private List pipelineAggregatorFactories; public static Builder builder() { return new Builder(); } - private AggregatorFactories(AggregatorFactory[] factories, List pipelineAggregators) { + protected AggregatorFactories(AggregatorFactory[] factories, List pipelineAggregators) { this.factories = factories; this.pipelineAggregatorFactories = pipelineAggregators; } + public AggregatorFactory[] getFactories() { + return factories; + } + + public List getPipelineAggregatorFactories() { + return pipelineAggregatorFactories; + } + public List createPipelineAggregators() { List pipelineAggregators = new ArrayList<>(this.pipelineAggregatorFactories.size()); for (PipelineAggregationBuilder factory : this.pipelineAggregatorFactories) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java index 970ef725f027d..1d54a86200a3e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java @@ -37,15 +37,15 @@ public abstract class AggregatorFactory { public static final class MultiBucketAggregatorWrapper extends Aggregator { - private final BigArrays bigArrays; - private final Aggregator parent; - private final AggregatorFactory factory; + protected final BigArrays bigArrays; + protected final AggregatorFactory factory; + protected ObjectArray aggregators; + protected ObjectArray collectors; + protected final Aggregator parent; private final Aggregator first; - ObjectArray aggregators; - ObjectArray collectors; MultiBucketAggregatorWrapper(BigArrays bigArrays, SearchContext context, - Aggregator parent, AggregatorFactory factory, Aggregator first) { + Aggregator parent, AggregatorFactory factory, Aggregator first) { this.bigArrays = bigArrays; this.parent = parent; this.factory = factory; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java index de886acdad127..b32462fab3e6d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java @@ -31,6 +31,8 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.profile.Profilers; +import org.elasticsearch.search.profile.aggregation.ProfilingAggregator; import java.io.IOException; import java.util.List; @@ -42,15 +44,15 @@ public class AbstractRangeAggregatorFactory extends ValuesSourc private final R[] ranges; private final boolean keyed; - public AbstractRangeAggregatorFactory(String name, - ValuesSourceConfig config, - R[] ranges, - boolean keyed, - InternalRange.Factory rangeFactory, - QueryShardContext queryShardContext, - AggregatorFactory parent, - AggregatorFactories.Builder subFactoriesBuilder, - Map metaData) throws IOException { + AbstractRangeAggregatorFactory(String name, + ValuesSourceConfig config, + R[] ranges, + boolean keyed, + InternalRange.Factory rangeFactory, + QueryShardContext queryShardContext, + AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder, + Map metaData) throws IOException { super(name, config, queryShardContext, parent, subFactoriesBuilder, metaData); this.ranges = ranges; this.keyed = keyed; @@ -59,21 +61,47 @@ public AbstractRangeAggregatorFactory(String name, @Override protected Aggregator createUnmapped(SearchContext searchContext, - Aggregator parent, - List pipelineAggregators, - Map metaData) throws IOException { + Aggregator parent, + List pipelineAggregators, + Map metaData) throws IOException { return new Unmapped<>(name, ranges, keyed, config.format(), searchContext, parent, rangeFactory, pipelineAggregators, metaData); } @Override protected Aggregator doCreateInternal(Numeric valuesSource, - SearchContext searchContext, - Aggregator parent, - boolean collectsFromSingleBucket, - List pipelineAggregators, - Map metaData) throws IOException { - return new RangeAggregator(name, factories, valuesSource, config, rangeFactory, ranges, keyed, searchContext, parent, - pipelineAggregators, metaData); + SearchContext searchContext, + Aggregator parent, + boolean collectsFromSingleBucket, + List pipelineAggregators, + Map metaData) throws IOException { + + AggregatorFactories wrappedFactories = factories; + + // If we don't have a parent, the range agg can potentially optimize by using the BKD tree. But BKD + // traversal is per-range, which means that docs are potentially called out-of-order across multiple + // ranges. To prevent this from causing problems, we create a special AggregatorFactories that + // wraps all the sub-aggs with a MultiBucketAggregatorWrapper. This effectively creates a new agg + // sub-tree for each range and prevents out-of-order problems + if (parent == null) { + wrappedFactories = new AggregatorFactories(factories.getFactories(), factories.getPipelineAggregatorFactories()) { + @Override + public Aggregator[] createSubAggregators(SearchContext searchContext, Aggregator parent) throws IOException { + Aggregator[] aggregators = new Aggregator[countAggregators()]; + for (int i = 0; i < this.factories.length; ++i) { + Aggregator factory = asMultiBucketAggregator(factories[i], searchContext, parent); + Profilers profilers = factory.context().getProfilers(); + if (profilers != null) { + factory = new ProfilingAggregator(factory, profilers.getAggregationProfiler()); + } + aggregators[i] = factory; + } + return aggregators; + } + }; + } + + return new RangeAggregator(name, wrappedFactories, valuesSource, config, rangeFactory, ranges, keyed, searchContext, parent, + pipelineAggregators, metaData); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java index 434e1686d81ff..fdd88d7e2b6e7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java @@ -239,7 +239,6 @@ public boolean equals(Object obj) { private final InternalRange.Factory rangeFactory; private final double[] maxTo; - private BiFunction pointEncoder; private final String pointField; private final boolean canOptimize; private byte[][] encodedRanges; @@ -256,7 +255,7 @@ public RangeAggregator(String name, AggregatorFactories factories, ValuesSource. this.keyed = keyed; this.rangeFactory = rangeFactory; this.ranges = ranges; - this.pointEncoder = configurePointEncoder(context, parent, config); + BiFunction pointEncoder = configurePointEncoder(context, parent, config); // Unbounded ranges collect most documents, so the BKD optimization doesn't // help nearly as much @@ -281,7 +280,6 @@ public RangeAggregator(String name, AggregatorFactories factories, ValuesSource. canOptimize = true; } else { pointField = null; - pointEncoder = null; canOptimize = false; } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/RangeIT.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/RangeIT.java index f7a1ce30d1ed7..d64a36511320d 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/RangeIT.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/RangeIT.java @@ -33,6 +33,7 @@ import org.elasticsearch.search.aggregations.bucket.range.Range; import org.elasticsearch.search.aggregations.bucket.range.Range.Bucket; import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.metrics.InternalMax; import org.elasticsearch.search.aggregations.metrics.Sum; import org.elasticsearch.test.ESIntegTestCase; import org.hamcrest.Matchers; @@ -48,6 +49,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; +import static org.elasticsearch.search.aggregations.AggregationBuilders.max; import static org.elasticsearch.search.aggregations.AggregationBuilders.range; import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; @@ -116,6 +118,18 @@ public void setupSuiteScopeCluster() throws Exception { .endObject())); } + createIndex("idx_big"); + numDocs = 5000; + builders = new ArrayList<>(); + for (int i = 0; i < numDocs; i++) { + builders.add(client().prepareIndex("idx_big").setSource(jsonBuilder() + .startObject() + .field(SINGLE_VALUED_FIELD_NAME, i+1) + .startArray(MULTI_VALUED_FIELD_NAME).value(i+1).value(i+2).endArray() + .endObject())); + } + client().admin().indices().prepareForceMerge("idx_big").setMaxNumSegments(1).get(); + // Create two indices and add the field 'route_length_miles' as an alias in // one, and a concrete field in the other. prepareCreate("old_index") @@ -243,6 +257,47 @@ public void testSingleValueField() throws Exception { assertThat(bucket.getDocCount(), equalTo(numDocs - 5L)); } + public void testMultipleRangesWithMaxBKDOptimization() throws Exception { + SearchResponse response = client().prepareSearch("idx_big") + .addAggregation(range("range") + .field(SINGLE_VALUED_FIELD_NAME) + .addRange(3, 6) + .addRange(6, 10) + .subAggregation(max("the_max").field(SINGLE_VALUED_FIELD_NAME))) + .get(); + + assertSearchResponse(response); + + + Range range = response.getAggregations().get("range"); + assertThat(range, notNullValue()); + assertThat(range.getName(), equalTo("range")); + List buckets = range.getBuckets(); + assertThat(buckets.size(), equalTo(2)); + + Range.Bucket bucket = buckets.get(0); + assertThat(bucket, notNullValue()); + assertThat(bucket.getKey(), equalTo("3.0-6.0")); + assertThat(((Number) bucket.getFrom()).doubleValue(), equalTo(3.0)); + assertThat(((Number) bucket.getTo()).doubleValue(), equalTo(6.0)); + assertThat(bucket.getFromAsString(), equalTo("3.0")); + assertThat(bucket.getToAsString(), equalTo("6.0")); + assertThat(bucket.getDocCount(), equalTo(3L)); + InternalMax max = bucket.getAggregations().get("the_max"); + assertThat(max.getValue(), equalTo(5.0)); + + bucket = buckets.get(1); + assertThat(bucket, notNullValue()); + assertThat(bucket.getKey(), equalTo("6.0-10.0")); + assertThat(((Number) bucket.getFrom()).doubleValue(), equalTo(6.0)); + assertThat(((Number) bucket.getTo()).doubleValue(), equalTo(10.0)); + assertThat(bucket.getFromAsString(), equalTo("6.0")); + assertThat(bucket.getToAsString(), equalTo("10.0")); + assertThat(bucket.getDocCount(), equalTo(4L)); + max = bucket.getAggregations().get("the_max"); + assertThat(max.getValue(), equalTo(9.0)); + } + public void testSingleValueFieldWithFormat() throws Exception { SearchResponse response = client() .prepareSearch("idx") From c0f649dca7c12402b7676bf95403bc5464daac48 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Mon, 9 Dec 2019 11:58:24 -0500 Subject: [PATCH 3/3] Address review comments - HalfFloatPoint.BYTES - Remove heuristics, always apply BKD optim if possible - Move optim decision up to factory --- .../index/mapper/NumberFieldMapper.java | 2 +- .../range/AbstractRangeAggregatorFactory.java | 88 ++++++++++++++---- .../GeoDistanceRangeAggregatorFactory.java | 2 +- .../bucket/range/RangeAggregator.java | 92 +++++-------------- .../search/aggregations/bucket/RangeIT.java | 12 ++- 5 files changed, 102 insertions(+), 94 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java index ebed596c3890b..c4c61dc16f15d 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java @@ -203,7 +203,7 @@ public byte[] encodePoint(Number value, boolean coerce) { @Override public int bytesPerEncodedPoint() { - return Integer.BYTES; + return HalfFloatPoint.BYTES; } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java index b32462fab3e6d..55870b0573ce3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java @@ -19,6 +19,11 @@ package org.elasticsearch.search.aggregations.bucket.range; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; @@ -37,6 +42,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.function.BiFunction; public class AbstractRangeAggregatorFactory extends ValuesSourceAggregatorFactory { @@ -75,34 +81,78 @@ protected Aggregator doCreateInternal(Numeric valuesSource, List pipelineAggregators, Map metaData) throws IOException { - AggregatorFactories wrappedFactories = factories; - // If we don't have a parent, the range agg can potentially optimize by using the BKD tree. But BKD // traversal is per-range, which means that docs are potentially called out-of-order across multiple // ranges. To prevent this from causing problems, we create a special AggregatorFactories that // wraps all the sub-aggs with a MultiBucketAggregatorWrapper. This effectively creates a new agg // sub-tree for each range and prevents out-of-order problems - if (parent == null) { - wrappedFactories = new AggregatorFactories(factories.getFactories(), factories.getPipelineAggregatorFactories()) { - @Override - public Aggregator[] createSubAggregators(SearchContext searchContext, Aggregator parent) throws IOException { - Aggregator[] aggregators = new Aggregator[countAggregators()]; - for (int i = 0; i < this.factories.length; ++i) { - Aggregator factory = asMultiBucketAggregator(factories[i], searchContext, parent); - Profilers profilers = factory.context().getProfilers(); - if (profilers != null) { - factory = new ProfilingAggregator(factory, profilers.getAggregationProfiler()); - } - aggregators[i] = factory; - } - return aggregators; - } - }; + BiFunction pointEncoder = configurePointEncoder(searchContext, parent, config); + AggregatorFactories wrappedFactories = factories; + if (pointEncoder != null) { + wrappedFactories = wrapSubAggsAsMultiBucket(factories); } return new RangeAggregator(name, wrappedFactories, valuesSource, config, rangeFactory, ranges, keyed, searchContext, parent, - pipelineAggregators, metaData); + pipelineAggregators, metaData, pointEncoder); } + /** + * Returns a converter for point values if BKD optimization is applicable to + * the context or null otherwise. Optimization criteria is: + * - Match_all query + * - no parent agg + * - no script + * - no missing value + * - has indexed points + * + * @param context The {@link SearchContext} of the aggregation. + * @param parent The parent aggregator. + * @param config The config for the values source metric. + */ + private BiFunction configurePointEncoder(SearchContext context, Aggregator parent, + ValuesSourceConfig config) { + if (context.query() != null && + context.query().getClass() != MatchAllDocsQuery.class) { + return null; + } + if (parent != null) { + return null; + } + if (config.fieldContext() != null && config.script() == null && config.missing() == null) { + MappedFieldType fieldType = config.fieldContext().fieldType(); + if (fieldType == null || fieldType.indexOptions() == IndexOptions.NONE) { + return null; + } + if (fieldType instanceof NumberFieldMapper.NumberFieldType) { + return ((NumberFieldMapper.NumberFieldType) fieldType)::encodePoint; + } else if (fieldType.getClass() == DateFieldMapper.DateFieldType.class) { + return NumberFieldMapper.NumberType.LONG::encodePoint; + } + } + return null; + } + + /** + * Creates a new{@link AggregatorFactories} object so that sub-aggs are automatically + * wrapped with a {@link org.elasticsearch.search.aggregations.AggregatorFactory.MultiBucketAggregatorWrapper}. + * This allows sub-aggs to execute in their own isolated sub tree + */ + private static AggregatorFactories wrapSubAggsAsMultiBucket(AggregatorFactories factories) { + return new AggregatorFactories(factories.getFactories(), factories.getPipelineAggregatorFactories()) { + @Override + public Aggregator[] createSubAggregators(SearchContext searchContext, Aggregator parent) throws IOException { + Aggregator[] aggregators = new Aggregator[countAggregators()]; + for (int i = 0; i < this.factories.length; ++i) { + Aggregator factory = asMultiBucketAggregator(factories[i], searchContext, parent); + Profilers profilers = factory.context().getProfilers(); + if (profilers != null) { + factory = new ProfilingAggregator(factory, profilers.getAggregationProfiler()); + } + aggregators[i] = factory; + } + return aggregators; + } + }; + } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/GeoDistanceRangeAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/GeoDistanceRangeAggregatorFactory.java index ee38298124e99..eeabad42ce577 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/GeoDistanceRangeAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/GeoDistanceRangeAggregatorFactory.java @@ -82,7 +82,7 @@ protected Aggregator doCreateInternal(final ValuesSource.GeoPoint valuesSource, Map metaData) throws IOException { DistanceSource distanceSource = new DistanceSource(valuesSource, distanceType, origin, unit); return new RangeAggregator(name, factories, distanceSource, config, rangeFactory, ranges, keyed, searchContext, - parent, pipelineAggregators, metaData); + parent, pipelineAggregators, metaData, null); } private static class DistanceSource extends ValuesSource.Numeric { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java index fdd88d7e2b6e7..0b866acb84d6e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java @@ -18,16 +18,15 @@ */ package org.elasticsearch.search.aggregations.bucket.range; -import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.PointValues; import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.Bits; import org.apache.lucene.util.DocIdSetBuilder; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -36,9 +35,6 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParserUtils; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; -import org.elasticsearch.index.mapper.DateFieldMapper; -import org.elasticsearch.index.mapper.MappedFieldType; -import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; @@ -241,12 +237,13 @@ public boolean equals(Object obj) { private final String pointField; private final boolean canOptimize; - private byte[][] encodedRanges; + private final byte[][] encodedRanges; public RangeAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, ValuesSourceConfig config, InternalRange.Factory rangeFactory, Range[] ranges, boolean keyed, SearchContext context, Aggregator parent, - List pipelineAggregators, Map metaData) throws IOException { + List pipelineAggregators, Map metaData, + BiFunction pointEncoder) throws IOException { super(name, factories, context, parent, pipelineAggregators, metaData); assert valuesSource != null; @@ -255,20 +252,14 @@ public RangeAggregator(String name, AggregatorFactories factories, ValuesSource. this.keyed = keyed; this.rangeFactory = rangeFactory; this.ranges = ranges; - BiFunction pointEncoder = configurePointEncoder(context, parent, config); - - // Unbounded ranges collect most documents, so the BKD optimization doesn't - // help nearly as much - boolean rangesAreBounded = Double.isFinite(ranges[0].from); maxTo = new double[ranges.length]; maxTo[0] = ranges[0].to; for (int i = 1; i < ranges.length; ++i) { maxTo[i] = Math.max(ranges[i].to, maxTo[i-1]); - rangesAreBounded &= Double.isFinite(ranges[i].to); } - if (pointEncoder != null && rangesAreBounded) { + if (pointEncoder != null) { pointField = config.fieldContext().field(); encodedRanges = new byte[ranges.length * 2][]; for (int i = 0; i < ranges.length; i++) { @@ -281,6 +272,7 @@ public RangeAggregator(String name, AggregatorFactories factories, ValuesSource. } else { pointField = null; canOptimize = false; + encodedRanges = null; } } @@ -393,25 +385,20 @@ private int collectValue(int doc, double value, long owningBucketOrdinal, int lo private void tryBKDOptimization(LeafReaderContext ctx, LeafBucketCollector sub) throws CollectionTerminatedException, IOException { final PointValues pointValues = ctx.reader().getPointValues(pointField); if (pointValues != null) { - PointValues.IntersectVisitor[] visitors = new PointValues.IntersectVisitor[ranges.length]; - DocIdSetBuilder[] results = new DocIdSetBuilder[ranges.length]; - final Bits liveDocs = ctx.reader().getLiveDocs(); int maxDoc = ctx.reader().maxDoc(); - long estimatedPoints = 0; - for (int i = 0; i < ranges.length; i++) { - // OK to allocate DocIdSetBuilder now since it allocates memory lazily and the - // estimation won't call `grow()` on the visitor (only once we start intersecting) - results[i] = new DocIdSetBuilder(maxDoc); - visitors[i] = getVisitor(liveDocs, encodedRanges[i * 2], encodedRanges[i * 2 + 1], results[i]); - estimatedPoints += pointValues.estimatePointCount(visitors[i]); - } - if (estimatedPoints < maxDoc * 0.75) { + try { + // pre-allocate what our DocIdSetBuilder will use as worst-case estimate + addRequestCircuitBreakerBytes(maxDoc / 8); + // We collect ranges individually since a doc can land in multiple ranges. for (int i = 0; i < ranges.length; i++) { - pointValues.intersect(visitors[i]); - DocIdSetIterator iter = results[i].build().iterator(); + DocIdSetBuilder result = new DocIdSetBuilder(maxDoc); + PointValues.IntersectVisitor visitor = getVisitor(liveDocs, encodedRanges[i * 2], encodedRanges[i * 2 + 1], result); + + pointValues.intersect(visitor); + DocIdSetIterator iter = result.build().iterator(); while (iter.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { // Now that we know the matching docs, collect the bucket and sub-aggs // @@ -419,11 +406,15 @@ private void tryBKDOptimization(LeafReaderContext ctx, LeafBucketCollector sub) // and bucket ordinals are zero-based offset by range ordinal collectBucket(sub, iter.docID(), i); } - // free this DocIdSet since we no longer need it, and it could be holding - // non-negligible amount of memory - results[i] = null; } throw new CollectionTerminatedException(); + + } catch (CircuitBreakingException e) { + // If we tripped the breaker the DocIdSetBuilder is (potentially) too large. + // Exit without throwing CollectionTerminatedException so we can fall back to old method + } finally { + // Make sure we account for DocIdSetBuilder deallocation + addRequestCircuitBreakerBytes(-maxDoc / 8); } } } @@ -433,7 +424,6 @@ private void tryBKDOptimization(LeafReaderContext ctx, LeafBucketCollector sub) */ private PointValues.IntersectVisitor getVisitor(Bits liveDocs, byte[] from, byte[] to, DocIdSetBuilder result) { - return new PointValues.IntersectVisitor() { DocIdSetBuilder.BulkAdder adder; @@ -486,7 +476,7 @@ public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue if (from != null && compareUnsigned(maxPackedValue, 0, packedLength, from, 0, from.length) < 0) { return PointValues.Relation.CELL_OUTSIDE_QUERY; } - // min >= from (inclusive, since ranges are exclusive on to) + // min >= to (inclusive, since ranges are exclusive on to) if (to != null && compareUnsigned(minPackedValue, 0, packedLength, to, 0, to.length) >= 0) { return PointValues.Relation.CELL_OUTSIDE_QUERY; } @@ -509,42 +499,6 @@ public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue }; } - /** - * Returns a converter for point values if BKD optimization is applicable to - * the context or null otherwise. Optimization criteria is: - * - Match_all query - * - no parent agg - * - no script - * - no missing value - * - has indexed points - * - * @param context The {@link SearchContext} of the aggregation. - * @param parent The parent aggregator. - * @param config The config for the values source metric. - */ - private BiFunction configurePointEncoder(SearchContext context, Aggregator parent, - ValuesSourceConfig config) { - if (context.query() != null && - context.query().getClass() != MatchAllDocsQuery.class) { - return null; - } - if (parent != null) { - return null; - } - if (config.fieldContext() != null && config.script() == null && config.missing() == null) { - MappedFieldType fieldType = config.fieldContext().fieldType(); - if (fieldType == null || fieldType.indexOptions() == IndexOptions.NONE) { - return null; - } - if (fieldType instanceof NumberFieldMapper.NumberFieldType) { - return ((NumberFieldMapper.NumberFieldType) fieldType)::encodePoint; - } else if (fieldType.getClass() == DateFieldMapper.DateFieldType.class) { - return NumberFieldMapper.NumberType.LONG::encodePoint; - } - } - return null; - } - private long subBucketOrdinal(long owningBucketOrdinal, int rangeOrd) { return owningBucketOrdinal * ranges.length + rangeOrd; } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/RangeIT.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/RangeIT.java index d64a36511320d..5a33251bcdf24 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/RangeIT.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/RangeIT.java @@ -18,9 +18,11 @@ */ package org.elasticsearch.search.aggregations.bucket; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.fielddata.ScriptDocValues; import org.elasticsearch.plugins.Plugin; @@ -68,6 +70,7 @@ public class RangeIT extends ESIntegTestCase { private static final String MULTI_VALUED_FIELD_NAME = "l_values"; static int numDocs; + static int numDocsBigIndex; @Override protected Collection> nodePlugins() { @@ -119,15 +122,16 @@ public void setupSuiteScopeCluster() throws Exception { } createIndex("idx_big"); - numDocs = 5000; - builders = new ArrayList<>(); - for (int i = 0; i < numDocs; i++) { - builders.add(client().prepareIndex("idx_big").setSource(jsonBuilder() + numDocsBigIndex = 5000; + BulkRequestBuilder bulkBuilder = client().prepareBulk("idx_big"); + for (int i = 0; i < numDocsBigIndex; i++) { + bulkBuilder.add(client().prepareIndex().setSource(jsonBuilder() .startObject() .field(SINGLE_VALUED_FIELD_NAME, i+1) .startArray(MULTI_VALUED_FIELD_NAME).value(i+1).value(i+2).endArray() .endObject())); } + bulkBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); client().admin().indices().prepareForceMerge("idx_big").setMaxNumSegments(1).get(); // Create two indices and add the field 'route_length_miles' as an alias in