From 8b0a863d427b4ebcbcfb1dcd69c996c52e7ae05e Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Tue, 21 Jan 2014 23:00:19 +0100 Subject: [PATCH] Terms aggregations: make size=0 return all terms. Terms aggregations return up to `size` terms, so up to now, the way to get all matching terms back was to set `size` to an arbitrary high number that would be larger than the number of unique terms. Terms aggregators already made sure to not allocate memory based on the `size` parameter so this commit mostly consists in making `0` an alias for the maximum integer value in the TermsParser. Close #4837 --- .../bucket/terms-aggregation.asciidoc | 2 ++ .../bucket/terms/DoubleTerms.java | 4 +-- .../bucket/terms/InternalTerms.java | 16 ++++++++++++ .../aggregations/bucket/terms/LongTerms.java | 4 +-- .../bucket/terms/StringTerms.java | 4 +-- .../bucket/terms/TermsParser.java | 8 ++++++ .../bucket/terms/UnmappedTerms.java | 4 +-- .../aggregations/bucket/DoubleTermsTests.java | 25 ++++++++++++++++--- .../aggregations/bucket/LongTermsTests.java | 21 +++++++++++++++- .../aggregations/bucket/StringTermsTests.java | 21 ++++++++++++++++ 10 files changed, 97 insertions(+), 12 deletions(-) diff --git a/docs/reference/search/aggregations/bucket/terms-aggregation.asciidoc b/docs/reference/search/aggregations/bucket/terms-aggregation.asciidoc index 0e08fcff5fc8f..0cc3582030cf3 100644 --- a/docs/reference/search/aggregations/bucket/terms-aggregation.asciidoc +++ b/docs/reference/search/aggregations/bucket/terms-aggregation.asciidoc @@ -65,6 +65,8 @@ the client. NOTE: `shard_size` cannot be smaller than `size` (as it doesn't make much sense). When it is, elasticsearch will override it and reset it to be equal to `size`. +added[1.1.0] It is possible to not limit the number of terms that are returned by setting `size` to `0`. Don't use this +on high-cardinality fields as this will kill both your CPU since terms need to be return sorted, and your network. ==== Order diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java index de379f6ac6d09..b30dac7a91e8e 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java @@ -167,7 +167,7 @@ public void readFrom(StreamInput in) throws IOException { this.name = in.readString(); this.order = InternalOrder.Streams.readOrder(in); this.valueFormatter = ValueFormatterStreams.readOptional(in); - this.requiredSize = in.readVInt(); + this.requiredSize = readSize(in); this.minDocCount = in.readVLong(); int size = in.readVInt(); List buckets = new ArrayList(size); @@ -183,7 +183,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(name); InternalOrder.Streams.writeOrder(order, out); ValueFormatterStreams.writeOptional(valueFormatter, out); - out.writeVInt(requiredSize); + writeSize(requiredSize, out); out.writeVLong(minDocCount); out.writeVInt(buckets.size()); for (InternalTerms.Bucket bucket : buckets) { diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index f4d0f5a650a90..c2717dfac9bd9 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -21,6 +21,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.elasticsearch.cache.recycler.CacheRecycler; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.xcontent.ToXContent; @@ -29,6 +31,7 @@ import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue; +import java.io.IOException; import java.util.*; /** @@ -185,4 +188,17 @@ final void trimExcessEntries() { buckets = newBuckets; } + // 0 actually means unlimited + protected static int readSize(StreamInput in) throws IOException { + final int size = in.readVInt(); + return size == 0 ? Integer.MAX_VALUE : size; + } + + protected static void writeSize(int size, StreamOutput out) throws IOException { + if (size == Integer.MAX_VALUE) { + size = 0; + } + out.writeVInt(size); + } + } diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java index 37600f81cdea6..0bdd68ab75030 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java @@ -164,7 +164,7 @@ public void readFrom(StreamInput in) throws IOException { this.name = in.readString(); this.order = InternalOrder.Streams.readOrder(in); this.valueFormatter = ValueFormatterStreams.readOptional(in); - this.requiredSize = in.readVInt(); + this.requiredSize = readSize(in); this.minDocCount = in.readVLong(); int size = in.readVInt(); List buckets = new ArrayList(size); @@ -180,7 +180,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(name); InternalOrder.Streams.writeOrder(order, out); ValueFormatterStreams.writeOptional(valueFormatter, out); - out.writeVInt(requiredSize); + writeSize(requiredSize, out); out.writeVLong(minDocCount); out.writeVInt(buckets.size()); for (InternalTerms.Bucket bucket : buckets) { diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java index f68cbe02df487..4d50e5c4a786e 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java @@ -96,7 +96,7 @@ public Type type() { public void readFrom(StreamInput in) throws IOException { this.name = in.readString(); this.order = InternalOrder.Streams.readOrder(in); - this.requiredSize = in.readVInt(); + this.requiredSize = readSize(in); this.minDocCount = in.readVLong(); int size = in.readVInt(); List buckets = new ArrayList(size); @@ -111,7 +111,7 @@ public void readFrom(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeString(name); InternalOrder.Streams.writeOrder(order, out); - out.writeVInt(requiredSize); + writeSize(requiredSize, out); out.writeVLong(minDocCount); out.writeVInt(buckets.size()); for (InternalTerms.Bucket bucket : buckets) { diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsParser.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsParser.java index 3634feb7e46cb..1bc08876dc42b 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsParser.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsParser.java @@ -176,6 +176,14 @@ public AggregatorFactory parse(String aggregationName, XContentParser parser, Se } } + if (shardSize == 0) { + shardSize = Integer.MAX_VALUE; + } + + if (requiredSize == 0) { + requiredSize = Integer.MAX_VALUE; + } + // shard_size cannot be smaller than size as we need to at least fetch entries from every shards in order to return if (shardSize < requiredSize) { shardSize = requiredSize; diff --git a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java index bf04f3724a648..d86c7baf446d8 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java +++ b/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java @@ -66,7 +66,7 @@ public Type type() { public void readFrom(StreamInput in) throws IOException { this.name = in.readString(); this.order = InternalOrder.Streams.readOrder(in); - this.requiredSize = in.readVInt(); + this.requiredSize = readSize(in); this.minDocCount = in.readVLong(); this.buckets = BUCKETS; this.bucketMap = BUCKETS_MAP; @@ -76,7 +76,7 @@ public void readFrom(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeString(name); InternalOrder.Streams.writeOrder(order, out); - out.writeVInt(requiredSize); + writeSize(requiredSize, out); out.writeVLong(minDocCount); } diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsTests.java index 54b0baa83926b..c894d6aaf5f31 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/DoubleTermsTests.java @@ -64,7 +64,7 @@ public Settings indexSettings() { @Before public void init() throws Exception { createIndex("idx"); - + IndexRequestBuilder[] lowcardBuilders = new IndexRequestBuilder[NUM_DOCS]; for (int i = 0; i < lowcardBuilders.length; i++) { lowcardBuilders[i] = client().prepareIndex("idx", "type").setSource(jsonBuilder() @@ -72,7 +72,7 @@ public void init() throws Exception { .field("value", (double) i) .startArray("values").value((double)i).value(i + 1d).endArray() .endObject()); - + } indexRandom(randomBoolean(), lowcardBuilders); IndexRequestBuilder[] highCardBuilders = new IndexRequestBuilder[100]; // TODO: randomize the size? @@ -89,6 +89,24 @@ public void init() throws Exception { ensureSearchable(); } + @Test + // the main purpose of this test is to make sure we're not allocating 2GB of memory per shard + public void sizeIsZero() { + SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type") + .addAggregation(terms("terms") + .field("value") + .minDocCount(randomInt(1)) + .size(0)) + .execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + assertThat(terms.buckets().size(), equalTo(100)); + } + @Test public void singleValueField() throws Exception { SearchResponse response = client().prepareSearch("idx").setTypes("type") @@ -550,7 +568,8 @@ public void script_MultiValued_WithAggregatorInherited_WithExplicitType() throws public void unmapped() throws Exception { SearchResponse response = client().prepareSearch("idx_unmapped").setTypes("type") .addAggregation(terms("terms") - .field("value")) + .field("value") + .size(randomInt(5))) .execute().actionGet(); assertSearchResponse(response); diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsTests.java index c480a553171a9..0c5db790b8a5b 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/LongTermsTests.java @@ -87,6 +87,24 @@ public void init() throws Exception { ensureSearchable(); } + @Test + // the main purpose of this test is to make sure we're not allocating 2GB of memory per shard + public void sizeIsZero() { + SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type") + .addAggregation(terms("terms") + .field("value") + .minDocCount(randomInt(1)) + .size(0)) + .execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + assertThat(terms.buckets().size(), equalTo(100)); + } + @Test public void singleValueField() throws Exception { SearchResponse response = client().prepareSearch("idx").setTypes("type") @@ -544,7 +562,8 @@ public void script_MultiValued_WithAggregatorInherited_WithExplicitType() throws public void unmapped() throws Exception { SearchResponse response = client().prepareSearch("idx_unmapped").setTypes("type") .addAggregation(terms("terms") - .field("value")) + .field("value") + .size(randomInt(5))) .execute().actionGet(); assertSearchResponse(response); diff --git a/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsTests.java b/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsTests.java index 3adb08ffef004..e78b15bdfc2bc 100644 --- a/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsTests.java +++ b/src/test/java/org/elasticsearch/search/aggregations/bucket/StringTermsTests.java @@ -97,6 +97,26 @@ public void init() throws Exception { ensureSearchable(); } + @Test + // the main purpose of this test is to make sure we're not allocating 2GB of memory per shard + public void sizeIsZero() { + final int minDocCount = randomInt(1); + SearchResponse response = client().prepareSearch("idx").setTypes("high_card_type") + .addAggregation(terms("terms") + .executionHint(randomExecutionHint()) + .field("value") + .minDocCount(minDocCount) + .size(0)) + .execute().actionGet(); + + assertSearchResponse(response);System.out.println(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + assertThat(terms.buckets().size(), equalTo(minDocCount == 0 ? 105 : 100)); // 105 because of the other type + } + @Test public void singleValueField() throws Exception { SearchResponse response = client().prepareSearch("idx").setTypes("type") @@ -686,6 +706,7 @@ public void unmapped() throws Exception { SearchResponse response = client().prepareSearch("idx_unmapped").setTypes("type") .addAggregation(terms("terms") .executionHint(randomExecutionHint()) + .size(randomInt(5)) .field("value")) .execute().actionGet();