diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index c01377d976189..3a7053d26d2f4 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -131,7 +131,10 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource, Aggregator pare // to be unbounded and most instances may only aggregate few // documents, so use hashed based // global ordinals to keep the bucket ords dense. - if (Aggregator.descendsFromBucketAggregator(parent)) { + // Additionally, if using partitioned terms the regular global + // ordinals would be sparse so we opt for hash + if (Aggregator.descendsFromBucketAggregator(parent) || + (includeExclude != null && includeExclude.isPartitionBased())) { execution = ExecutionMode.GLOBAL_ORDINALS_HASH; } else { if (factories == AggregatorFactories.EMPTY) { diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java index e751c54fb16e9..98e7ebdbb1915 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/IncludeExclude.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.search.aggregations.bucket.terms.support; +import com.carrotsearch.hppc.BitMixer; import com.carrotsearch.hppc.LongHashSet; import com.carrotsearch.hppc.LongSet; @@ -35,6 +36,7 @@ import org.apache.lucene.util.automaton.Operations; import org.apache.lucene.util.automaton.RegExp; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.io.stream.StreamInput; @@ -46,6 +48,7 @@ import org.elasticsearch.search.DocValueFormat; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Map; import java.util.Objects; @@ -61,15 +64,34 @@ public class IncludeExclude implements Writeable, ToXContent { private static final ParseField INCLUDE_FIELD = new ParseField("include"); private static final ParseField EXCLUDE_FIELD = new ParseField("exclude"); private static final ParseField PATTERN_FIELD = new ParseField("pattern"); + private static final ParseField PARTITION_FIELD = new ParseField("partition"); + private static final ParseField NUM_PARTITIONS_FIELD = new ParseField("num_partitions"); // The includeValue and excludeValue ByteRefs which are the result of the parsing // process are converted into a LongFilter when used on numeric fields // in the index. - public static class LongFilter { + public abstract static class LongFilter { + public abstract boolean accept(long value); + + } + + public class PartitionedLongFilter extends LongFilter { + private final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + + @Override + public boolean accept(long value) { + // hash the value to keep even distributions + final long hashCode = BitMixer.mix64(value); + return Math.floorMod(hashCode, incNumPartitions) == incZeroBasedPartition; + } + } + + + public static class SetBackedLongFilter extends LongFilter { private LongSet valids; private LongSet invalids; - private LongFilter(int numValids, int numInvalids) { + private SetBackedLongFilter(int numValids, int numInvalids) { if (numValids > 0) { valids = new LongHashSet(numValids); } @@ -96,6 +118,13 @@ public abstract static class StringFilter { public abstract boolean accept(BytesRef value); } + class PartitionedStringFilter extends StringFilter { + @Override + public boolean accept(BytesRef value) { + return Math.floorMod(value.hashCode(), incNumPartitions) == incZeroBasedPartition; + } + } + static class AutomatonBackedStringFilter extends StringFilter { private final ByteRunAutomaton runAutomaton; @@ -138,6 +167,25 @@ public abstract static class OrdinalsFilter { } + class PartitionedOrdinalsFilter extends OrdinalsFilter { + + @Override + public LongBitSet acceptedGlobalOrdinals(RandomAccessOrds globalOrdinals) throws IOException { + final long numOrds = globalOrdinals.getValueCount(); + final LongBitSet acceptedGlobalOrdinals = new LongBitSet(numOrds); + final TermsEnum termEnum = globalOrdinals.termsEnum(); + + BytesRef term = termEnum.next(); + while (term != null) { + if (Math.floorMod(term.hashCode(), incNumPartitions) == incZeroBasedPartition) { + acceptedGlobalOrdinals.set(termEnum.ord()); + } + term = termEnum.next(); + } + return acceptedGlobalOrdinals; + } + } + static class AutomatonBackedOrdinalsFilter extends OrdinalsFilter { private final CompiledAutomaton compiled; @@ -205,6 +253,8 @@ public LongBitSet acceptedGlobalOrdinals(RandomAccessOrds globalOrdinals) throws private final RegExp include, exclude; private final SortedSet includeValues, excludeValues; + private final int incZeroBasedPartition; + private final int incNumPartitions; /** * @param include The regular expression pattern for the terms to be included @@ -218,6 +268,8 @@ public IncludeExclude(RegExp include, RegExp exclude) { this.exclude = exclude; this.includeValues = null; this.excludeValues = null; + this.incZeroBasedPartition = 0; + this.incNumPartitions = 0; } public IncludeExclude(String include, String exclude) { @@ -234,6 +286,8 @@ public IncludeExclude(SortedSet includeValues, SortedSet exc } this.include = null; this.exclude = null; + this.incZeroBasedPartition = 0; + this.incNumPartitions = 0; this.includeValues = includeValues; this.excludeValues = excludeValues; } @@ -250,6 +304,21 @@ public IncludeExclude(long[] includeValues, long[] excludeValues) { this(convertToBytesRefSet(includeValues), convertToBytesRefSet(excludeValues)); } + public IncludeExclude(int partition, int numPartitions) { + if (partition < 0 || partition >= numPartitions) { + throw new IllegalArgumentException("Partition must be >=0 and < numPartition which is "+numPartitions); + } + this.incZeroBasedPartition = partition; + this.incNumPartitions = numPartitions; + this.include = null; + this.exclude = null; + this.includeValues = null; + this.excludeValues = null; + + } + + + /** * Read from a stream. */ @@ -257,6 +326,8 @@ public IncludeExclude(StreamInput in) throws IOException { if (in.readBoolean()) { includeValues = null; excludeValues = null; + incZeroBasedPartition = 0; + incNumPartitions = 0; String includeString = in.readOptionalString(); include = includeString == null ? null : new RegExp(includeString); String excludeString = in.readOptionalString(); @@ -283,6 +354,13 @@ public IncludeExclude(StreamInput in) throws IOException { } else { excludeValues = null; } + if (in.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) { + incNumPartitions = in.readVInt(); + incZeroBasedPartition = in.readVInt(); + } else { + incNumPartitions = 0; + incZeroBasedPartition = 0; + } } @Override @@ -309,6 +387,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBytesRef(value); } } + if (out.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) { + out.writeVInt(incNumPartitions); + out.writeVInt(incZeroBasedPartition); + } } } @@ -436,11 +518,26 @@ public boolean token(String currentFieldName, XContentParser.Token token, XConte if (token == XContentParser.Token.START_OBJECT) { if (parseFieldMatcher.match(currentFieldName, INCLUDE_FIELD)) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + + // This "include":{"pattern":"foo.*"} syntax is undocumented since 2.0 + // Regexes should be "include":"foo.*" if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); } else if (token == XContentParser.Token.VALUE_STRING) { if (parseFieldMatcher.match(currentFieldName, PATTERN_FIELD)) { otherOptions.put(INCLUDE_FIELD, parser.text()); + } else { + throw new ElasticsearchParseException( + "Unknown string parameter in Include/Exclude clause: " + currentFieldName); + } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (parseFieldMatcher.match(currentFieldName, NUM_PARTITIONS_FIELD)) { + otherOptions.put(NUM_PARTITIONS_FIELD, parser.intValue()); + } else if (parseFieldMatcher.match(currentFieldName, PARTITION_FIELD)) { + otherOptions.put(INCLUDE_FIELD, parser.intValue()); + } else { + throw new ElasticsearchParseException( + "Unknown numeric parameter in Include/Exclude clause: " + currentFieldName); } } } @@ -480,15 +577,43 @@ private Set parseArrayToSet(XContentParser parser) throws IOException public IncludeExclude createIncludeExclude(Map otherOptions) { Object includeObject = otherOptions.get(INCLUDE_FIELD); String include = null; + int partition = -1; + int numPartitions = -1; SortedSet includeValues = null; if (includeObject != null) { if (includeObject instanceof String) { include = (String) includeObject; } else if (includeObject instanceof SortedSet) { includeValues = (SortedSet) includeObject; + } else if (includeObject instanceof Integer) { + partition = (Integer) includeObject; + Object numPartitionsObject = otherOptions.get(NUM_PARTITIONS_FIELD); + if (numPartitionsObject instanceof Integer) { + numPartitions = (Integer) numPartitionsObject; + if (numPartitions < 2) { + throw new IllegalArgumentException(NUM_PARTITIONS_FIELD.getPreferredName() + " must be >1"); + } + if (partition < 0 || partition >= numPartitions) { + throw new IllegalArgumentException( + PARTITION_FIELD.getPreferredName() + " must be >=0 and <" + numPartitions); + } + } else { + if (numPartitionsObject == null) { + throw new IllegalArgumentException(NUM_PARTITIONS_FIELD.getPreferredName() + " parameter is missing"); + } + throw new IllegalArgumentException(NUM_PARTITIONS_FIELD.getPreferredName() + " value must be an integer"); + } } } Object excludeObject = otherOptions.get(EXCLUDE_FIELD); + if (numPartitions >0 ){ + if(excludeObject!=null){ + throw new IllegalArgumentException("Partitioned Include cannot be used in combination with excludes"); + } + return new IncludeExclude(partition, numPartitions); + } + + String exclude = null; SortedSet excludeValues = null; if (excludeObject != null) { @@ -517,6 +642,10 @@ public boolean isRegexBased() { return include != null || exclude != null; } + public boolean isPartitionBased() { + return incNumPartitions > 0; + } + private Automaton toAutomaton() { Automaton a = null; if (include != null) { @@ -538,6 +667,9 @@ public StringFilter convertToStringFilter(DocValueFormat format) { if (isRegexBased()) { return new AutomatonBackedStringFilter(toAutomaton()); } + if (isPartitionBased()){ + return new PartitionedStringFilter(); + } return new TermListBackedStringFilter(parseForDocValues(includeValues, format), parseForDocValues(excludeValues, format)); } @@ -559,13 +691,22 @@ public OrdinalsFilter convertToOrdinalsFilter(DocValueFormat format) { if (isRegexBased()) { return new AutomatonBackedOrdinalsFilter(toAutomaton()); } + if (isPartitionBased()){ + return new PartitionedOrdinalsFilter(); + } + return new TermListBackedOrdinalsFilter(parseForDocValues(includeValues, format), parseForDocValues(excludeValues, format)); } public LongFilter convertToLongFilter(DocValueFormat format) { + + if(isPartitionBased()){ + return new PartitionedLongFilter(); + } + int numValids = includeValues == null ? 0 : includeValues.size(); int numInvalids = excludeValues == null ? 0 : excludeValues.size(); - LongFilter result = new LongFilter(numValids, numInvalids); + SetBackedLongFilter result = new SetBackedLongFilter(numValids, numInvalids); if (includeValues != null) { for (BytesRef val : includeValues) { result.addAccept(format.parseLong(val.utf8ToString(), false, null)); @@ -580,9 +721,13 @@ public LongFilter convertToLongFilter(DocValueFormat format) { } public LongFilter convertToDoubleFilter() { + if(isPartitionBased()){ + return new PartitionedLongFilter(); + } + int numValids = includeValues == null ? 0 : includeValues.size(); int numInvalids = excludeValues == null ? 0 : excludeValues.size(); - LongFilter result = new LongFilter(numValids, numInvalids); + SetBackedLongFilter result = new SetBackedLongFilter(numValids, numInvalids); if (includeValues != null) { for (BytesRef val : includeValues) { double dval = Double.parseDouble(val.utf8ToString()); diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsIT.java index 1dc9943e8a3c4..ef477553bacea 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsIT.java @@ -33,6 +33,7 @@ import org.elasticsearch.search.aggregations.bucket.filter.Filter; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; import org.elasticsearch.search.aggregations.metrics.avg.Avg; import org.elasticsearch.search.aggregations.metrics.max.Max; @@ -48,10 +49,12 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.function.Function; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -359,6 +362,43 @@ private void testIncludeExcludeResults(double[] includes, double[] excludes, dou assertThat(bucket.getDocCount(), equalTo(1L)); } } + + public void testSingleValueFieldWithPartitionedFiltering() throws Exception { + runTestFieldWithPartitionedFiltering(SINGLE_VALUED_FIELD_NAME); + } + + public void testMultiValueFieldWithPartitionedFiltering() throws Exception { + runTestFieldWithPartitionedFiltering(MULTI_VALUED_FIELD_NAME); + } + + private void runTestFieldWithPartitionedFiltering(String field) throws Exception { + // Find total number of unique terms + SearchResponse allResponse = client().prepareSearch("idx").setTypes("type") + .addAggregation(terms("terms").field(field).size(10000).collectMode(randomFrom(SubAggCollectionMode.values()))) + .execute().actionGet(); + assertSearchResponse(allResponse); + Terms terms = allResponse.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + int expectedCardinality = terms.getBuckets().size(); + + // Gather terms using partitioned aggregations + final int numPartitions = randomIntBetween(2, 4); + Set foundTerms = new HashSet<>(); + for (int partition = 0; partition < numPartitions; partition++) { + SearchResponse response = client().prepareSearch("idx").setTypes("type").addAggregation(terms("terms").field(field) + .includeExclude(new IncludeExclude(partition, numPartitions)).collectMode(randomFrom(SubAggCollectionMode.values()))) + .execute().actionGet(); + assertSearchResponse(response); + terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + for (Bucket bucket : terms.getBuckets()) { + assertTrue(foundTerms.add(bucket.getKeyAsNumber())); + } + } + assertEquals(expectedCardinality, foundTerms.size()); + } public void testSingleValueFieldOrderedByTermAsc() throws Exception { SearchResponse response = client().prepareSearch("idx").setTypes("type") diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsIT.java index 1739d09a05447..35905f91a91ca 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsIT.java @@ -32,6 +32,7 @@ import org.elasticsearch.search.aggregations.bucket.filter.Filter; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; import org.elasticsearch.search.aggregations.metrics.avg.Avg; import org.elasticsearch.search.aggregations.metrics.max.Max; @@ -47,10 +48,12 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.function.Function; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -326,6 +329,48 @@ private void testIncludeExcludeResults(long[] includes, long[] excludes, long[] assertThat(bucket.getDocCount(), equalTo(1L)); } } + + + + public void testSingleValueFieldWithPartitionedFiltering() throws Exception { + runTestFieldWithPartitionedFiltering(SINGLE_VALUED_FIELD_NAME); + } + + public void testMultiValueFieldWithPartitionedFiltering() throws Exception { + runTestFieldWithPartitionedFiltering(MULTI_VALUED_FIELD_NAME); + } + + private void runTestFieldWithPartitionedFiltering(String field) throws Exception { + // Find total number of unique terms + SearchResponse allResponse = client().prepareSearch("idx").setTypes("type") + .addAggregation(terms("terms").field(field).collectMode(randomFrom(SubAggCollectionMode.values()))).execute().actionGet(); + assertSearchResponse(allResponse); + Terms terms = allResponse.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + int expectedCardinality = terms.getBuckets().size(); + + // Gather terms using partitioned aggregations + final int numPartitions = randomIntBetween(2, 4); + Set foundTerms = new HashSet<>(); + for (int partition = 0; partition < numPartitions; partition++) { + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .addAggregation( + terms("terms").field(field).includeExclude(new IncludeExclude(partition, numPartitions)) + .collectMode(randomFrom(SubAggCollectionMode.values()))) + .execute().actionGet(); + assertSearchResponse(response); + terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + for (Bucket bucket : terms.getBuckets()) { + assertFalse(foundTerms.contains(bucket.getKeyAsNumber())); + foundTerms.add(bucket.getKeyAsNumber()); + } + } + assertEquals(expectedCardinality, foundTerms.size()); + } + public void testSingleValueFieldWithMaxSize() throws Exception { SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type") diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsIT.java index 3d5d13bf04a18..46af395c476ea 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsIT.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.search.aggregations.bucket; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.StringHelper; import org.apache.lucene.util.automaton.RegExp; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -37,6 +39,7 @@ import org.elasticsearch.search.aggregations.bucket.filter.Filter; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode; import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; import org.elasticsearch.search.aggregations.metrics.avg.Avg; @@ -54,10 +57,12 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.function.Function; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -455,6 +460,44 @@ public void testSingleValueFieldWithExactTermFiltering() throws Exception { } } + + public void testSingleValueFieldWithPartitionedFiltering() throws Exception { + runTestFieldWithPartitionedFiltering(SINGLE_VALUED_FIELD_NAME); + } + + public void testMultiValueFieldWithPartitionedFiltering() throws Exception { + runTestFieldWithPartitionedFiltering(MULTI_VALUED_FIELD_NAME); + } + + private void runTestFieldWithPartitionedFiltering(String field) throws Exception { + // Find total number of unique terms + SearchResponse allResponse = client().prepareSearch("idx").setTypes("type") + .addAggregation(terms("terms").field(field).size(10000).collectMode(randomFrom(SubAggCollectionMode.values()))) + .execute().actionGet(); + assertSearchResponse(allResponse); + Terms terms = allResponse.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + int expectedCardinality = terms.getBuckets().size(); + + // Gather terms using partitioned aggregations + final int numPartitions = randomIntBetween(2, 4); + Set foundTerms = new HashSet<>(); + for (int partition = 0; partition < numPartitions; partition++) { + SearchResponse response = client().prepareSearch("idx").setTypes("type").addAggregation(terms("terms").field(field) + .includeExclude(new IncludeExclude(partition, numPartitions)).collectMode(randomFrom(SubAggCollectionMode.values()))) + .execute().actionGet(); + assertSearchResponse(response); + terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + for (Bucket bucket : terms.getBuckets()) { + assertTrue(foundTerms.add(bucket.getKeyAsString())); + } + } + assertEquals(expectedCardinality, foundTerms.size()); + } + public void testSingleValueFieldWithMaxSize() throws Exception { SearchResponse response = client() diff --git a/docs/reference/aggregations/bucket/terms-aggregation.asciidoc b/docs/reference/aggregations/bucket/terms-aggregation.asciidoc index fb3baca09679f..180bcad1d0b63 100644 --- a/docs/reference/aggregations/bucket/terms-aggregation.asciidoc +++ b/docs/reference/aggregations/bucket/terms-aggregation.asciidoc @@ -514,7 +514,10 @@ TIP: for indexed scripts replace the `file` parameter with an `id` parameter. ==== Filtering Values It is possible to filter the values for which buckets will be created. This can be done using the `include` and -`exclude` parameters which are based on regular expression strings or arrays of exact values. +`exclude` parameters which are based on regular expression strings or arrays of exact values. Additionally, +`include` clauses can filter using `partition` expressions. + +===== Filtering Values with regular expressions [source,js] -------------------------------------------------- @@ -538,6 +541,8 @@ both are defined, the `exclude` has precedence, meaning, the `include` is evalua The syntax is the same as <>. +===== Filtering Values with exact values + For matching based on exact values the `include` and `exclude` parameters can simply take an array of strings that represent the terms as they are found in the index: @@ -561,6 +566,67 @@ strings that represent the terms as they are found in the index: } -------------------------------------------------- +===== Filtering Values with partitions + +Sometimes there are too many unique terms to process in a single request/response pair so +it can be useful to break the analysis up into multiple requests. +This can be achieved by grouping the field's values into a number of partitions at query-time and processing +only one partition in each request. +Consider this request which is looking for accounts that have not logged any access recently: + +[source,js] +-------------------------------------------------- +{ + "size": 0, + "aggs": { + "expired_sessions": { + "terms": { + "field": "account_id", + "include": { + "partition": 0, + "num_partitions": 20 + }, + "size": 10000, + "order": { + "last_access": "asc" + } + }, + "aggs": { + "last_access": { + "max": { + "field": "access_date" + } + } + } + } + } +} +-------------------------------------------------- + +This request is finding the last logged access date for a subset of customer accounts because we +might want to expire some customer accounts who haven't been seen for a long while. +The `num_partitions` setting has requested that the unique account_ids are organized evenly into twenty +partitions (0 to 19). and the `partition` setting in this request filters to only consider account_ids falling +into partition 0. Subsequent requests should ask for partitions 1 then 2 etc to complete the expired-account analysis. + +Note that the `size` setting for the number of results returned needs to be tuned with the `num_partitions`. +For this particular account-expiration example the process for balancing values for `size` and `num_partitions` would be as follows: + +1. Use the `cardinality` aggregation to estimate the total number of unique account_id values +2. Pick a value for `num_partitions` to break the number from 1) up into more manageable chunks +3. Pick a `size` value for the number of responses we want from each partition +4. Run a test request + +If we have a circuit-breaker error we are trying to do too much in one request and must increase `num_partitions`. +If the request was successful but the last account ID in the date-sorted test response was still an account we might want to +expire then we may be missing accounts of interest and have set our numbers too low. We must either + +* increase the `size` parameter to return more results per partition (could be heavy on memory) or +* increase the `num_partitions` to consider less accounts per request (could increase overall processing time as we need to make more requests) + +Ultimately this is a balancing act between managing the elasticsearch resources required to process a single request and the volume +of requests that the client application must issue to complete a task. + ==== Multi-field terms aggregation The `terms` aggregation does not support collecting terms from multiple fields