Skip to content

Commit

Permalink
Aggregations - support for partitioning set of terms used in aggregat…
Browse files Browse the repository at this point in the history
…ions so that multiple requests can be done without trying to compute everything in one request.

Closes #21487
  • Loading branch information
markharwood committed Nov 24, 2016
1 parent ee74b76 commit d78ae86
Show file tree
Hide file tree
Showing 6 changed files with 348 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource, Aggregator pare
// to be unbounded and most instances may only aggregate few
// documents, so use hashed based
// global ordinals to keep the bucket ords dense.
if (Aggregator.descendsFromBucketAggregator(parent)) {
// Additionally, if using partitioned terms the regular global
// ordinals would be sparse so we opt for hash
if (Aggregator.descendsFromBucketAggregator(parent) ||
(includeExclude != null && includeExclude.isPartitionBased())) {
execution = ExecutionMode.GLOBAL_ORDINALS_HASH;
} else {
if (factories == AggregatorFactories.EMPTY) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.search.aggregations.bucket.terms.support;

import com.carrotsearch.hppc.BitMixer;
import com.carrotsearch.hppc.LongHashSet;
import com.carrotsearch.hppc.LongSet;

Expand All @@ -35,6 +36,7 @@
import org.apache.lucene.util.automaton.Operations;
import org.apache.lucene.util.automaton.RegExp;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -46,6 +48,7 @@
import org.elasticsearch.search.DocValueFormat;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
Expand All @@ -61,15 +64,34 @@ public class IncludeExclude implements Writeable, ToXContent {
private static final ParseField INCLUDE_FIELD = new ParseField("include");
private static final ParseField EXCLUDE_FIELD = new ParseField("exclude");
private static final ParseField PATTERN_FIELD = new ParseField("pattern");
private static final ParseField PARTITION_FIELD = new ParseField("partition");
private static final ParseField NUM_PARTITIONS_FIELD = new ParseField("num_partitions");

// The includeValue and excludeValue ByteRefs which are the result of the parsing
// process are converted into a LongFilter when used on numeric fields
// in the index.
public static class LongFilter {
public abstract static class LongFilter {
public abstract boolean accept(long value);

}

public class PartitionedLongFilter extends LongFilter {
private final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);

@Override
public boolean accept(long value) {
// hash the value to keep even distributions
final long hashCode = BitMixer.mix64(value);
return Math.floorMod(hashCode, incNumPartitions) == incZeroBasedPartition;
}
}


public static class SetBackedLongFilter extends LongFilter {
private LongSet valids;
private LongSet invalids;

private LongFilter(int numValids, int numInvalids) {
private SetBackedLongFilter(int numValids, int numInvalids) {
if (numValids > 0) {
valids = new LongHashSet(numValids);
}
Expand All @@ -96,6 +118,13 @@ public abstract static class StringFilter {
public abstract boolean accept(BytesRef value);
}

class PartitionedStringFilter extends StringFilter {
@Override
public boolean accept(BytesRef value) {
return Math.floorMod(value.hashCode(), incNumPartitions) == incZeroBasedPartition;
}
}

static class AutomatonBackedStringFilter extends StringFilter {

private final ByteRunAutomaton runAutomaton;
Expand Down Expand Up @@ -138,6 +167,25 @@ public abstract static class OrdinalsFilter {

}

class PartitionedOrdinalsFilter extends OrdinalsFilter {

@Override
public LongBitSet acceptedGlobalOrdinals(RandomAccessOrds globalOrdinals) throws IOException {
final long numOrds = globalOrdinals.getValueCount();
final LongBitSet acceptedGlobalOrdinals = new LongBitSet(numOrds);
final TermsEnum termEnum = globalOrdinals.termsEnum();

BytesRef term = termEnum.next();
while (term != null) {
if (Math.floorMod(term.hashCode(), incNumPartitions) == incZeroBasedPartition) {
acceptedGlobalOrdinals.set(termEnum.ord());
}
term = termEnum.next();
}
return acceptedGlobalOrdinals;
}
}

static class AutomatonBackedOrdinalsFilter extends OrdinalsFilter {

private final CompiledAutomaton compiled;
Expand Down Expand Up @@ -205,6 +253,8 @@ public LongBitSet acceptedGlobalOrdinals(RandomAccessOrds globalOrdinals) throws

private final RegExp include, exclude;
private final SortedSet<BytesRef> includeValues, excludeValues;
private final int incZeroBasedPartition;
private final int incNumPartitions;

/**
* @param include The regular expression pattern for the terms to be included
Expand All @@ -218,6 +268,8 @@ public IncludeExclude(RegExp include, RegExp exclude) {
this.exclude = exclude;
this.includeValues = null;
this.excludeValues = null;
this.incZeroBasedPartition = 0;
this.incNumPartitions = 0;
}

public IncludeExclude(String include, String exclude) {
Expand All @@ -234,6 +286,8 @@ public IncludeExclude(SortedSet<BytesRef> includeValues, SortedSet<BytesRef> exc
}
this.include = null;
this.exclude = null;
this.incZeroBasedPartition = 0;
this.incNumPartitions = 0;
this.includeValues = includeValues;
this.excludeValues = excludeValues;
}
Expand All @@ -250,13 +304,30 @@ public IncludeExclude(long[] includeValues, long[] excludeValues) {
this(convertToBytesRefSet(includeValues), convertToBytesRefSet(excludeValues));
}

public IncludeExclude(int partition, int numPartitions) {
if (partition < 0 || partition >= numPartitions) {
throw new IllegalArgumentException("Partition must be >=0 and < numPartition which is "+numPartitions);
}
this.incZeroBasedPartition = partition;
this.incNumPartitions = numPartitions;
this.include = null;
this.exclude = null;
this.includeValues = null;
this.excludeValues = null;

}



/**
* Read from a stream.
*/
public IncludeExclude(StreamInput in) throws IOException {
if (in.readBoolean()) {
includeValues = null;
excludeValues = null;
incZeroBasedPartition = 0;
incNumPartitions = 0;
String includeString = in.readOptionalString();
include = includeString == null ? null : new RegExp(includeString);
String excludeString = in.readOptionalString();
Expand All @@ -283,6 +354,13 @@ public IncludeExclude(StreamInput in) throws IOException {
} else {
excludeValues = null;
}
if (in.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) {
incNumPartitions = in.readVInt();
incZeroBasedPartition = in.readVInt();
} else {
incNumPartitions = 0;
incZeroBasedPartition = 0;
}
}

@Override
Expand All @@ -309,6 +387,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBytesRef(value);
}
}
if (out.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) {
out.writeVInt(incNumPartitions);
out.writeVInt(incZeroBasedPartition);
}
}
}

Expand Down Expand Up @@ -436,11 +518,26 @@ public boolean token(String currentFieldName, XContentParser.Token token, XConte
if (token == XContentParser.Token.START_OBJECT) {
if (parseFieldMatcher.match(currentFieldName, INCLUDE_FIELD)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {

// This "include":{"pattern":"foo.*"} syntax is undocumented since 2.0
// Regexes should be "include":"foo.*"
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if (parseFieldMatcher.match(currentFieldName, PATTERN_FIELD)) {
otherOptions.put(INCLUDE_FIELD, parser.text());
} else {
throw new ElasticsearchParseException(
"Unknown string parameter in Include/Exclude clause: " + currentFieldName);
}
} else if (token == XContentParser.Token.VALUE_NUMBER) {
if (parseFieldMatcher.match(currentFieldName, NUM_PARTITIONS_FIELD)) {
otherOptions.put(NUM_PARTITIONS_FIELD, parser.intValue());
} else if (parseFieldMatcher.match(currentFieldName, PARTITION_FIELD)) {
otherOptions.put(INCLUDE_FIELD, parser.intValue());
} else {
throw new ElasticsearchParseException(
"Unknown numeric parameter in Include/Exclude clause: " + currentFieldName);
}
}
}
Expand Down Expand Up @@ -480,15 +577,43 @@ private Set<BytesRef> parseArrayToSet(XContentParser parser) throws IOException
public IncludeExclude createIncludeExclude(Map<ParseField, Object> otherOptions) {
Object includeObject = otherOptions.get(INCLUDE_FIELD);
String include = null;
int partition = -1;
int numPartitions = -1;
SortedSet<BytesRef> includeValues = null;
if (includeObject != null) {
if (includeObject instanceof String) {
include = (String) includeObject;
} else if (includeObject instanceof SortedSet) {
includeValues = (SortedSet<BytesRef>) includeObject;
} else if (includeObject instanceof Integer) {
partition = (Integer) includeObject;
Object numPartitionsObject = otherOptions.get(NUM_PARTITIONS_FIELD);
if (numPartitionsObject instanceof Integer) {
numPartitions = (Integer) numPartitionsObject;
if (numPartitions < 2) {
throw new IllegalArgumentException(NUM_PARTITIONS_FIELD.getPreferredName() + " must be >1");
}
if (partition < 0 || partition >= numPartitions) {
throw new IllegalArgumentException(
PARTITION_FIELD.getPreferredName() + " must be >=0 and <" + numPartitions);
}
} else {
if (numPartitionsObject == null) {
throw new IllegalArgumentException(NUM_PARTITIONS_FIELD.getPreferredName() + " parameter is missing");
}
throw new IllegalArgumentException(NUM_PARTITIONS_FIELD.getPreferredName() + " value must be an integer");
}
}
}
Object excludeObject = otherOptions.get(EXCLUDE_FIELD);
if (numPartitions >0 ){
if(excludeObject!=null){
throw new IllegalArgumentException("Partitioned Include cannot be used in combination with excludes");
}
return new IncludeExclude(partition, numPartitions);
}


String exclude = null;
SortedSet<BytesRef> excludeValues = null;
if (excludeObject != null) {
Expand Down Expand Up @@ -517,6 +642,10 @@ public boolean isRegexBased() {
return include != null || exclude != null;
}

public boolean isPartitionBased() {
return incNumPartitions > 0;
}

private Automaton toAutomaton() {
Automaton a = null;
if (include != null) {
Expand All @@ -538,6 +667,9 @@ public StringFilter convertToStringFilter(DocValueFormat format) {
if (isRegexBased()) {
return new AutomatonBackedStringFilter(toAutomaton());
}
if (isPartitionBased()){
return new PartitionedStringFilter();
}
return new TermListBackedStringFilter(parseForDocValues(includeValues, format), parseForDocValues(excludeValues, format));
}

Expand All @@ -559,13 +691,22 @@ public OrdinalsFilter convertToOrdinalsFilter(DocValueFormat format) {
if (isRegexBased()) {
return new AutomatonBackedOrdinalsFilter(toAutomaton());
}
if (isPartitionBased()){
return new PartitionedOrdinalsFilter();
}

return new TermListBackedOrdinalsFilter(parseForDocValues(includeValues, format), parseForDocValues(excludeValues, format));
}

public LongFilter convertToLongFilter(DocValueFormat format) {

if(isPartitionBased()){
return new PartitionedLongFilter();
}

int numValids = includeValues == null ? 0 : includeValues.size();
int numInvalids = excludeValues == null ? 0 : excludeValues.size();
LongFilter result = new LongFilter(numValids, numInvalids);
SetBackedLongFilter result = new SetBackedLongFilter(numValids, numInvalids);
if (includeValues != null) {
for (BytesRef val : includeValues) {
result.addAccept(format.parseLong(val.utf8ToString(), false, null));
Expand All @@ -580,9 +721,13 @@ public LongFilter convertToLongFilter(DocValueFormat format) {
}

public LongFilter convertToDoubleFilter() {
if(isPartitionBased()){
return new PartitionedLongFilter();
}

int numValids = includeValues == null ? 0 : includeValues.size();
int numInvalids = excludeValues == null ? 0 : excludeValues.size();
LongFilter result = new LongFilter(numValids, numInvalids);
SetBackedLongFilter result = new SetBackedLongFilter(numValids, numInvalids);
if (includeValues != null) {
for (BytesRef val : includeValues) {
double dval = Double.parseDouble(val.utf8ToString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
import org.elasticsearch.search.aggregations.metrics.max.Max;
Expand All @@ -48,10 +49,12 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -359,6 +362,43 @@ private void testIncludeExcludeResults(double[] includes, double[] excludes, dou
assertThat(bucket.getDocCount(), equalTo(1L));
}
}

public void testSingleValueFieldWithPartitionedFiltering() throws Exception {
runTestFieldWithPartitionedFiltering(SINGLE_VALUED_FIELD_NAME);
}

public void testMultiValueFieldWithPartitionedFiltering() throws Exception {
runTestFieldWithPartitionedFiltering(MULTI_VALUED_FIELD_NAME);
}

private void runTestFieldWithPartitionedFiltering(String field) throws Exception {
// Find total number of unique terms
SearchResponse allResponse = client().prepareSearch("idx").setTypes("type")
.addAggregation(terms("terms").field(field).size(10000).collectMode(randomFrom(SubAggCollectionMode.values())))
.execute().actionGet();
assertSearchResponse(allResponse);
Terms terms = allResponse.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
int expectedCardinality = terms.getBuckets().size();

// Gather terms using partitioned aggregations
final int numPartitions = randomIntBetween(2, 4);
Set<Number> foundTerms = new HashSet<>();
for (int partition = 0; partition < numPartitions; partition++) {
SearchResponse response = client().prepareSearch("idx").setTypes("type").addAggregation(terms("terms").field(field)
.includeExclude(new IncludeExclude(partition, numPartitions)).collectMode(randomFrom(SubAggCollectionMode.values())))
.execute().actionGet();
assertSearchResponse(response);
terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
for (Bucket bucket : terms.getBuckets()) {
assertTrue(foundTerms.add(bucket.getKeyAsNumber()));
}
}
assertEquals(expectedCardinality, foundTerms.size());
}

public void testSingleValueFieldOrderedByTermAsc() throws Exception {
SearchResponse response = client().prepareSearch("idx").setTypes("type")
Expand Down
Loading

0 comments on commit d78ae86

Please sign in to comment.