Skip to content

Commit

Permalink
Terms aggregations: make size=0 return all terms.
Browse files Browse the repository at this point in the history
Terms aggregations return up to `size` terms, so up to now, the way to get all
matching terms back was to set `size` to an arbitrary high number that would be
larger than the number of unique terms.

Terms aggregators already made sure to not allocate memory based on the `size`
parameter so this commit mostly consists in making `0` an alias for the
maximum integer value in the TermsParser.

Close #4837
  • Loading branch information
jpountz committed Jan 22, 2014
1 parent 21897fd commit 8b0a863
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ the client.
NOTE: `shard_size` cannot be smaller than `size` (as it doesn't make much sense). When it is, elasticsearch will
override it and reset it to be equal to `size`.

added[1.1.0] It is possible to not limit the number of terms that are returned by setting `size` to `0`. Don't use this
on high-cardinality fields as this will kill both your CPU since terms need to be return sorted, and your network.

==== Order

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void readFrom(StreamInput in) throws IOException {
this.name = in.readString();
this.order = InternalOrder.Streams.readOrder(in);
this.valueFormatter = ValueFormatterStreams.readOptional(in);
this.requiredSize = in.readVInt();
this.requiredSize = readSize(in);
this.minDocCount = in.readVLong();
int size = in.readVInt();
List<InternalTerms.Bucket> buckets = new ArrayList<InternalTerms.Bucket>(size);
Expand All @@ -183,7 +183,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
InternalOrder.Streams.writeOrder(order, out);
ValueFormatterStreams.writeOptional(valueFormatter, out);
out.writeVInt(requiredSize);
writeSize(requiredSize, out);
out.writeVLong(minDocCount);
out.writeVInt(buckets.size());
for (InternalTerms.Bucket bucket : buckets) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.ToXContent;
Expand All @@ -29,6 +31,7 @@
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue;

import java.io.IOException;
import java.util.*;

/**
Expand Down Expand Up @@ -185,4 +188,17 @@ final void trimExcessEntries() {
buckets = newBuckets;
}

// 0 actually means unlimited
protected static int readSize(StreamInput in) throws IOException {
final int size = in.readVInt();
return size == 0 ? Integer.MAX_VALUE : size;
}

protected static void writeSize(int size, StreamOutput out) throws IOException {
if (size == Integer.MAX_VALUE) {
size = 0;
}
out.writeVInt(size);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void readFrom(StreamInput in) throws IOException {
this.name = in.readString();
this.order = InternalOrder.Streams.readOrder(in);
this.valueFormatter = ValueFormatterStreams.readOptional(in);
this.requiredSize = in.readVInt();
this.requiredSize = readSize(in);
this.minDocCount = in.readVLong();
int size = in.readVInt();
List<InternalTerms.Bucket> buckets = new ArrayList<InternalTerms.Bucket>(size);
Expand All @@ -180,7 +180,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
InternalOrder.Streams.writeOrder(order, out);
ValueFormatterStreams.writeOptional(valueFormatter, out);
out.writeVInt(requiredSize);
writeSize(requiredSize, out);
out.writeVLong(minDocCount);
out.writeVInt(buckets.size());
for (InternalTerms.Bucket bucket : buckets) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public Type type() {
public void readFrom(StreamInput in) throws IOException {
this.name = in.readString();
this.order = InternalOrder.Streams.readOrder(in);
this.requiredSize = in.readVInt();
this.requiredSize = readSize(in);
this.minDocCount = in.readVLong();
int size = in.readVInt();
List<InternalTerms.Bucket> buckets = new ArrayList<InternalTerms.Bucket>(size);
Expand All @@ -111,7 +111,7 @@ public void readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
InternalOrder.Streams.writeOrder(order, out);
out.writeVInt(requiredSize);
writeSize(requiredSize, out);
out.writeVLong(minDocCount);
out.writeVInt(buckets.size());
for (InternalTerms.Bucket bucket : buckets) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,14 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se
}
}

if (shardSize == 0) {
shardSize = Integer.MAX_VALUE;
}

if (requiredSize == 0) {
requiredSize = Integer.MAX_VALUE;
}

// shard_size cannot be smaller than size as we need to at least fetch <size> entries from every shards in order to return <size>
if (shardSize < requiredSize) {
shardSize = requiredSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public Type type() {
public void readFrom(StreamInput in) throws IOException {
this.name = in.readString();
this.order = InternalOrder.Streams.readOrder(in);
this.requiredSize = in.readVInt();
this.requiredSize = readSize(in);
this.minDocCount = in.readVLong();
this.buckets = BUCKETS;
this.bucketMap = BUCKETS_MAP;
Expand All @@ -76,7 +76,7 @@ public void readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
InternalOrder.Streams.writeOrder(order, out);
out.writeVInt(requiredSize);
writeSize(requiredSize, out);
out.writeVLong(minDocCount);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ public Settings indexSettings() {
@Before
public void init() throws Exception {
createIndex("idx");

IndexRequestBuilder[] lowcardBuilders = new IndexRequestBuilder[NUM_DOCS];
for (int i = 0; i < lowcardBuilders.length; i++) {
lowcardBuilders[i] = client().prepareIndex("idx", "type").setSource(jsonBuilder()
.startObject()
.field("value", (double) i)
.startArray("values").value((double)i).value(i + 1d).endArray()
.endObject());

}
indexRandom(randomBoolean(), lowcardBuilders);
IndexRequestBuilder[] highCardBuilders = new IndexRequestBuilder[100]; // TODO: randomize the size?
Expand All @@ -89,6 +89,24 @@ public void init() throws Exception {
ensureSearchable();
}

@Test
// the main purpose of this test is to make sure we're not allocating 2GB of memory per shard
public void sizeIsZero() {
SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type")
.addAggregation(terms("terms")
.field("value")
.minDocCount(randomInt(1))
.size(0))
.execute().actionGet();

assertSearchResponse(response);

Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
assertThat(terms.buckets().size(), equalTo(100));
}

@Test
public void singleValueField() throws Exception {
SearchResponse response = client().prepareSearch("idx").setTypes("type")
Expand Down Expand Up @@ -550,7 +568,8 @@ public void script_MultiValued_WithAggregatorInherited_WithExplicitType() throws
public void unmapped() throws Exception {
SearchResponse response = client().prepareSearch("idx_unmapped").setTypes("type")
.addAggregation(terms("terms")
.field("value"))
.field("value")
.size(randomInt(5)))
.execute().actionGet();

assertSearchResponse(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ public void init() throws Exception {
ensureSearchable();
}

@Test
// the main purpose of this test is to make sure we're not allocating 2GB of memory per shard
public void sizeIsZero() {
SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type")
.addAggregation(terms("terms")
.field("value")
.minDocCount(randomInt(1))
.size(0))
.execute().actionGet();

assertSearchResponse(response);

Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
assertThat(terms.buckets().size(), equalTo(100));
}

@Test
public void singleValueField() throws Exception {
SearchResponse response = client().prepareSearch("idx").setTypes("type")
Expand Down Expand Up @@ -544,7 +562,8 @@ public void script_MultiValued_WithAggregatorInherited_WithExplicitType() throws
public void unmapped() throws Exception {
SearchResponse response = client().prepareSearch("idx_unmapped").setTypes("type")
.addAggregation(terms("terms")
.field("value"))
.field("value")
.size(randomInt(5)))
.execute().actionGet();

assertSearchResponse(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,26 @@ public void init() throws Exception {
ensureSearchable();
}

@Test
// the main purpose of this test is to make sure we're not allocating 2GB of memory per shard
public void sizeIsZero() {
final int minDocCount = randomInt(1);
SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type")
.addAggregation(terms("terms")
.executionHint(randomExecutionHint())
.field("value")
.minDocCount(minDocCount)
.size(0))
.execute().actionGet();

assertSearchResponse(response);System.out.println(response);

Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
assertThat(terms.buckets().size(), equalTo(minDocCount == 0 ? 105 : 100)); // 105 because of the other type
}

@Test
public void singleValueField() throws Exception {
SearchResponse response = client().prepareSearch("idx").setTypes("type")
Expand Down Expand Up @@ -686,6 +706,7 @@ public void unmapped() throws Exception {
SearchResponse response = client().prepareSearch("idx_unmapped").setTypes("type")
.addAggregation(terms("terms")
.executionHint(randomExecutionHint())
.size(randomInt(5))
.field("value"))
.execute().actionGet();

Expand Down

0 comments on commit 8b0a863

Please sign in to comment.