From 15b7de00f2ff75587560ac6e38ffb20da030178e Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Wed, 9 Aug 2023 12:21:04 -0700 Subject: [PATCH] Do not evaluate shard_size and shard_min_doc_count at segment slice level (#9085) * Use BucketCountThresholds in InternalTerms and InternalAggregations and do not apply shard level thresholds at slice level for Concurrent Segment Search Signed-off-by: Jay Deng * Addressing comments Signed-off-by: Jay Deng * Re-introduce shardSize member to InternalMultiTerms and InternalMappedTerms Signed-off-by: Jay Deng * Introduce LocalBucketCountThresholds for local size and min_doc_count values Signed-off-by: Jay Deng --------- Signed-off-by: Jay Deng --- CHANGELOG.md | 3 +- .../aggregations/TermsReduceBenchmark.java | 6 +-- .../StringTermsSerializationBenchmark.java | 6 +-- .../AggregationCollectorManager.java | 19 +++---- .../GlobalAggCollectorManager.java | 10 ++++ .../aggregations/InternalAggregation.java | 14 ++++++ .../NonGlobalAggCollectorManager.java | 10 ++++ .../bucket/LocalBucketCountThresholds.java | 36 ++++++++++++++ .../terms/AbstractStringTermsAggregator.java | 10 ++-- .../bucket/terms/DoubleTerms.java | 20 +++----- .../GlobalOrdinalsStringTermsAggregator.java | 20 ++++---- .../terms/InternalMappedSignificantTerms.java | 7 ++- .../bucket/terms/InternalMappedTerms.java | 7 ++- .../bucket/terms/InternalMultiTerms.java | 17 +++---- .../terms/InternalSignificantTerms.java | 29 +++++++++-- .../bucket/terms/InternalTerms.java | 32 ++++++++---- .../aggregations/bucket/terms/LongTerms.java | 25 ++++------ .../terms/MapStringTermsAggregator.java | 16 +++--- .../bucket/terms/MultiTermsAggregator.java | 16 +++--- .../bucket/terms/NumericTermsAggregator.java | 49 +++++++++---------- .../bucket/terms/SignificantLongTerms.java | 17 +++---- .../bucket/terms/SignificantStringTerms.java | 17 +++---- .../SignificantTermsAggregatorFactory.java | 7 +-- .../bucket/terms/StringTerms.java | 20 +++----- .../bucket/terms/TermsAggregator.java | 25 ++++++++++ .../bucket/terms/TermsAggregatorFactory.java | 8 +-- .../terms/UnmappedSignificantTerms.java | 12 +++-- .../bucket/terms/UnmappedTerms.java | 13 +++-- .../bucket/terms/UnsignedLongTerms.java | 25 ++++------ .../search/internal/SearchContext.java | 14 ++++++ .../InternalAggregationsTests.java | 11 ++--- .../InternalMultiBucketAggregationTests.java | 12 ++--- .../bucket/terms/DoubleTermsTests.java | 18 ++++--- .../bucket/terms/InternalMultiTermsTests.java | 11 +++-- .../InternalSignificantTermsTestCase.java | 2 +- .../bucket/terms/InternalTermsTestCase.java | 2 +- .../bucket/terms/LongTermsTests.java | 18 ++++--- .../terms/SignificanceHeuristicTests.java | 32 +++++++++--- .../terms/SignificantLongTermsTests.java | 19 +++++-- .../terms/SignificantStringTermsTests.java | 12 ++--- .../bucket/terms/StringTermsTests.java | 18 ++++--- .../bucket/terms/UnsignedLongTermsTests.java | 18 ++++--- .../aggregations/AggregatorTestCase.java | 1 + 43 files changed, 411 insertions(+), 273 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/aggregations/bucket/LocalBucketCountThresholds.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 63b962a8357ec..3799b8a2db9bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -118,6 +118,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801)) - [Remove] Deprecated Fractional ByteSizeValue support #9005 ([#9005](https://github.com/opensearch-project/OpenSearch/pull/9005)) - Make MultiBucketConsumerService thread safe to use across slices during search ([#9047](https://github.com/opensearch-project/OpenSearch/pull/9047)) +- Change shard_size and shard_min_doc_count evaluation to happen in shard level reduce phase ([#9085](https://github.com/opensearch-project/OpenSearch/pull/9085)) ### Deprecated @@ -129,4 +130,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x \ No newline at end of file +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/TermsReduceBenchmark.java b/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/TermsReduceBenchmark.java index b18ea4327cbc2..180676e7b2ed2 100644 --- a/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/TermsReduceBenchmark.java +++ b/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/TermsReduceBenchmark.java @@ -57,6 +57,7 @@ import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.bucket.terms.StringTerms; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.query.QuerySearchResult; @@ -170,15 +171,14 @@ private StringTerms newTerms(Random rand, BytesRef[] dict, boolean withNested) { "terms", BucketOrder.key(true), BucketOrder.count(false), - topNSize, - 1, Collections.emptyMap(), DocValueFormat.RAW, numShards, true, 0, buckets, - 0 + 0, + new TermsAggregator.BucketCountThresholds(1, 0, topNSize, numShards) ); } diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java b/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java index 8f86a0f3afbc6..8d528d94e463e 100644 --- a/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java +++ b/benchmarks/src/main/java/org/opensearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java @@ -51,6 +51,7 @@ import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; import java.util.ArrayList; import java.util.List; @@ -86,15 +87,14 @@ private StringTerms newTerms(boolean withNested) { "test", BucketOrder.key(true), BucketOrder.key(true), - buckets, - 1, null, DocValueFormat.RAW, buckets, false, 100000, resultBuckets, - 0 + 0, + new TermsAggregator.BucketCountThresholds(1, 0, buckets, buckets) ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java index 1f60ff6503ca8..ae06f80516e3e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java @@ -26,7 +26,7 @@ * aggregation operators */ class AggregationCollectorManager implements CollectorManager { - private final SearchContext context; + protected final SearchContext context; private final CheckedFunction, IOException> aggProvider; private final String collectorReason; @@ -63,18 +63,11 @@ public ReduceableSearchResult reduce(Collection collectors) throws IO } final InternalAggregations internalAggregations = InternalAggregations.from(internals); - // Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce iff multiple slices - // were created to execute this request and it used concurrent segment search path - // TODO: Add the check for flag that the request was executed using concurrent search - if (collectors.size() > 1) { - // using reduce is fine here instead of topLevelReduce as pipeline aggregation is evaluated on the coordinator after all - // documents are collected across shards for an aggregation - return new AggregationReduceableSearchResult( - InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard()) - ); - } else { - return new AggregationReduceableSearchResult(internalAggregations); - } + return buildAggregationResult(internalAggregations); + } + + protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) { + return new AggregationReduceableSearchResult(internalAggregations); } static Collector createCollector(SearchContext context, List collectors, String reason) throws IOException { diff --git a/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java index 56f53a57a8573..41e8aba895480 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java @@ -14,6 +14,7 @@ import org.opensearch.search.profile.query.CollectorResult; import java.io.IOException; +import java.util.Collections; import java.util.Objects; /** @@ -38,4 +39,13 @@ public Collector newCollector() throws IOException { return super.newCollector(); } } + + @Override + protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) { + // Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices + // were created so that we can apply shard level bucket count thresholds in the reduce phase. + return new AggregationReduceableSearchResult( + InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard()) + ); + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java b/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java index 999ed458f2388..d1de09cffd674 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java +++ b/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java @@ -40,6 +40,8 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.rest.action.search.RestSearchAction; import org.opensearch.script.ScriptService; +import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; import org.opensearch.search.aggregations.support.AggregationPath; @@ -160,6 +162,18 @@ public boolean isSliceLevel() { return this.isSliceLevel; } + /** + * For slice level partial reduce we will apply shard level `shard_size` and `shard_min_doc_count` limits + * whereas for coordinator level partial reduce it will use top level `size` and `min_doc_count` + */ + public LocalBucketCountThresholds asLocalBucketCountThresholds(TermsAggregator.BucketCountThresholds bucketCountThresholds) { + if (isSliceLevel()) { + return new LocalBucketCountThresholds(bucketCountThresholds.getShardMinDocCount(), bucketCountThresholds.getShardSize()); + } else { + return new LocalBucketCountThresholds(bucketCountThresholds.getMinDocCount(), bucketCountThresholds.getRequiredSize()); + } + } + public BigArrays bigArrays() { return bigArrays; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java index 3729734c48ed7..984eefb9b52a4 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java @@ -14,6 +14,7 @@ import org.opensearch.search.profile.query.CollectorResult; import java.io.IOException; +import java.util.Collections; import java.util.Objects; /** @@ -38,4 +39,13 @@ public Collector newCollector() throws IOException { return super.newCollector(); } } + + @Override + protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) { + // Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices + // were created so that we can apply shard level bucket count thresholds in the reduce phase. + return new AggregationReduceableSearchResult( + InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard()) + ); + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/LocalBucketCountThresholds.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/LocalBucketCountThresholds.java new file mode 100644 index 0000000000000..98dc07cec49ab --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/LocalBucketCountThresholds.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.bucket; + +import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; + +/** + * BucketCountThresholds type that holds the local (either shard level or request level) bucket count thresholds in minDocCount and requireSize fields. + * Similar to {@link TermsAggregator.BucketCountThresholds} however only provides getters for the local members and no setters. + * + * @opensearch.internal + */ +public class LocalBucketCountThresholds { + + private final long minDocCount; + private final int requiredSize; + + public LocalBucketCountThresholds(long localminDocCount, int localRequiredSize) { + this.minDocCount = localminDocCount; + this.requiredSize = localRequiredSize; + } + + public int getRequiredSize() { + return requiredSize; + } + + public long getMinDocCount() { + return minDocCount; + } +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java index 9551be10e52b8..d06a0ed9976fc 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java @@ -76,15 +76,14 @@ protected StringTerms buildEmptyTermsAggregation() { name, order, order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), - 0 + 0, + bucketCountThresholds ); } @@ -95,14 +94,13 @@ protected SignificantStringTerms buildEmptySignificantTermsAggregation(long subs int supersetSize = topReader.numDocs(); return new SignificantStringTerms( name, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, subsetSize, supersetSize, significanceHeuristic, - emptyList() + emptyList(), + bucketCountThresholds ); } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/DoubleTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/DoubleTerms.java index 0b76c302801af..de02d5a938644 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/DoubleTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/DoubleTerms.java @@ -130,29 +130,27 @@ public DoubleTerms( String name, BucketOrder reduceOrder, BucketOrder order, - int requiredSize, - long minDocCount, Map metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount, List buckets, - long docCountError + long docCountError, + TermsAggregator.BucketCountThresholds bucketCountThresholds ) { super( name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -174,15 +172,14 @@ public DoubleTerms create(List buckets) { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -204,15 +201,14 @@ protected DoubleTerms create(String name, List buckets, BucketOrder redu name, reduceOrder, order, - requiredSize, - minDocCount, getMetadata(), format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index e0a22435b8f48..53e5bce91ead2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -57,6 +57,7 @@ import org.opensearch.search.aggregations.InternalOrder; import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.LeafBucketCollectorBase; +import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes; import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; import org.opensearch.search.aggregations.support.ValuesSource; @@ -603,6 +604,7 @@ abstract class ResultStrategy< TB extends InternalMultiBucketAggregation.InternalBucket> implements Releasable { private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + LocalBucketCountThresholds localBucketCountThresholds = context.asLocalBucketCountThresholds(bucketCountThresholds); if (valueCount == 0) { // no context in this reader InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { @@ -615,11 +617,11 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws long[] otherDocCount = new long[owningBucketOrds.length]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { final int size; - if (bucketCountThresholds.getMinDocCount() == 0) { + if (localBucketCountThresholds.getMinDocCount() == 0) { // if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns - size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize()); + size = (int) Math.min(valueCount, localBucketCountThresholds.getRequiredSize()); } else { - size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize()); + size = (int) Math.min(maxBucketOrd(), localBucketCountThresholds.getRequiredSize()); } PriorityQueue ordered = buildPriorityQueue(size); final int finalOrdIdx = ordIdx; @@ -630,7 +632,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws @Override public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException { otherDocCount[finalOrdIdx] += docCount; - if (docCount >= bucketCountThresholds.getShardMinDocCount()) { + if (docCount >= localBucketCountThresholds.getMinDocCount()) { if (spare == null) { spare = buildEmptyTemporaryBucket(); } @@ -799,15 +801,14 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu name, reduceOrder, order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, Arrays.asList(topBuckets), - 0 + 0, + bucketCountThresholds ); } @@ -924,14 +925,13 @@ void buildSubAggs(SignificantStringTerms.Bucket[][] topBucketsPreOrd) throws IOE SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, SignificantStringTerms.Bucket[] topBuckets) { return new SignificantStringTerms( name, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, subsetSize(owningBucketOrd), supersetSize, significanceHeuristic, - Arrays.asList(topBuckets) + Arrays.asList(topBuckets), + bucketCountThresholds ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedSignificantTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedSignificantTerms.java index 97a95b8df840b..a7c5427fa38cc 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedSignificantTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedSignificantTerms.java @@ -64,16 +64,15 @@ public abstract class InternalMappedSignificantTerms< protected InternalMappedSignificantTerms( String name, - int requiredSize, - long minDocCount, Map metadata, DocValueFormat format, long subsetSize, long supersetSize, SignificanceHeuristic significanceHeuristic, - List buckets + List buckets, + TermsAggregator.BucketCountThresholds bucketCountThresholds ) { - super(name, requiredSize, minDocCount, metadata); + super(name, bucketCountThresholds, metadata); this.format = format; this.buckets = buckets; this.subsetSize = subsetSize; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedTerms.java index f5e92fec8195d..d542064df24d7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMappedTerms.java @@ -64,17 +64,16 @@ protected InternalMappedTerms( String name, BucketOrder reduceOrder, BucketOrder order, - int requiredSize, - long minDocCount, Map metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount, List buckets, - long docCountError + long docCountError, + TermsAggregator.BucketCountThresholds bucketCountThresholds ) { - super(name, reduceOrder, order, requiredSize, minDocCount, metadata); + super(name, reduceOrder, order, bucketCountThresholds, metadata); this.format = format; this.shardSize = shardSize; this.showTermDocCountError = showTermDocCountError; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTerms.java index fc84f35385d5c..4d1cbd4ce72f1 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTerms.java @@ -233,17 +233,16 @@ public InternalMultiTerms( String name, BucketOrder reduceOrder, BucketOrder order, - int requiredSize, - long minDocCount, Map metadata, int shardSize, boolean showTermDocCountError, long otherDocCount, long docCountError, List formats, - List buckets + List buckets, + TermsAggregator.BucketCountThresholds bucketCountThresholds ) { - super(name, reduceOrder, order, requiredSize, minDocCount, metadata); + super(name, reduceOrder, order, bucketCountThresholds, metadata); this.shardSize = shardSize; this.showTermDocCountError = showTermDocCountError; this.otherDocCount = otherDocCount; @@ -278,15 +277,14 @@ public InternalMultiTerms create(List buckets) { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, shardSize, showTermDocCountError, otherDocCount, docCountError, termFormats, - buckets + buckets, + bucketCountThresholds ); } @@ -357,15 +355,14 @@ protected InternalMultiTerms create( name, reduceOrder, order, - requiredSize, - minDocCount, metadata, shardSize, showTermDocCountError, otherDocCount, docCountError, termFormats, - buckets + buckets, + bucketCountThresholds ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java index 84d148199a7f9..03bb519ed9961 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java @@ -39,6 +39,7 @@ import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.aggregations.InternalMultiBucketAggregation; +import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; import java.io.IOException; @@ -195,11 +196,17 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params) protected final int requiredSize; protected final long minDocCount; + protected final TermsAggregator.BucketCountThresholds bucketCountThresholds; - protected InternalSignificantTerms(String name, int requiredSize, long minDocCount, Map metadata) { + protected InternalSignificantTerms( + String name, + TermsAggregator.BucketCountThresholds bucketCountThresholds, + Map metadata + ) { super(name, metadata); - this.requiredSize = requiredSize; - this.minDocCount = minDocCount; + this.requiredSize = bucketCountThresholds.getRequiredSize(); + this.minDocCount = bucketCountThresholds.getMinDocCount(); + this.bucketCountThresholds = bucketCountThresholds; } /** @@ -209,6 +216,9 @@ protected InternalSignificantTerms(StreamInput in) throws IOException { super(in); requiredSize = readSize(in); minDocCount = in.readVLong(); + // shardMinDocCount and shardSize are not used on the coordinator, so they are not deserialized. We use + // CoordinatorBucketCountThresholds which will throw an exception if they are accessed. + bucketCountThresholds = new TermsAggregator.CoordinatorBucketCountThresholds(minDocCount, -1, requiredSize, -1); } protected final void doWriteTo(StreamOutput out) throws IOException { @@ -224,6 +234,7 @@ protected final void doWriteTo(StreamOutput out) throws IOException { @Override public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { + LocalBucketCountThresholds localBucketCountThresholds = reduceContext.asLocalBucketCountThresholds(bucketCountThresholds); long globalSubsetSize = 0; long globalSupersetSize = 0; // Compute the overall result set size and the corpus size using the @@ -265,13 +276,21 @@ public InternalAggregation reduce(List aggregations, Reduce } } SignificanceHeuristic heuristic = getSignificanceHeuristic().rewrite(reduceContext); - final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size()); + boolean isCoordinatorPartialReduce = reduceContext.isFinalReduce() == false && reduceContext.isSliceLevel() == false; + // Do not apply size threshold on coordinator partial reduce + final int size = !isCoordinatorPartialReduce + ? Math.min(localBucketCountThresholds.getRequiredSize(), buckets.size()) + : buckets.size(); BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue<>(size); for (Map.Entry> entry : buckets.entrySet()) { List sameTermBuckets = entry.getValue(); final B b = reduceBucket(sameTermBuckets, reduceContext); b.updateScore(heuristic); - if (((b.score > 0) && (b.subsetDf >= minDocCount)) || reduceContext.isFinalReduce() == false) { + // For concurrent search case we do not apply bucket count thresholds in buildAggregation and instead is done here during + // reduce. However, the bucket score is only evaluated at the final coordinator reduce. + boolean meetsThresholds = (b.subsetDf >= localBucketCountThresholds.getMinDocCount()) + && (((b.score > 0) || reduceContext.isSliceLevel())); + if (isCoordinatorPartialReduce || meetsThresholds) { B removed = ordered.insertWithOverflow(b); if (removed == null) { reduceContext.consumeBucketsAndMaybeBreak(1); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java index 9a80155eea51c..0be7de5ded32f 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java @@ -46,6 +46,7 @@ import org.opensearch.search.aggregations.InternalOrder; import org.opensearch.search.aggregations.KeyComparable; import org.opensearch.search.aggregations.bucket.IteratorAndCurrent; +import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; import org.opensearch.search.aggregations.bucket.MultiBucketsAggregation; import java.io.IOException; @@ -223,29 +224,29 @@ public int hashCode() { protected final BucketOrder order; protected final int requiredSize; protected final long minDocCount; + protected final TermsAggregator.BucketCountThresholds bucketCountThresholds; /** * Creates a new {@link InternalTerms} * @param name The name of the aggregation * @param reduceOrder The {@link BucketOrder} that should be used to merge shard results. * @param order The {@link BucketOrder} that should be used to sort the final reduce. - * @param requiredSize The number of top buckets. - * @param minDocCount The minimum number of documents allowed per bucket. + * @param bucketCountThresholds Object containing values for minDocCount, shardMinDocCount, size, shardSize. * @param metadata The metadata associated with the aggregation. */ protected InternalTerms( String name, BucketOrder reduceOrder, BucketOrder order, - int requiredSize, - long minDocCount, + TermsAggregator.BucketCountThresholds bucketCountThresholds, Map metadata ) { super(name, metadata); this.reduceOrder = reduceOrder; this.order = order; - this.requiredSize = requiredSize; - this.minDocCount = minDocCount; + this.bucketCountThresholds = bucketCountThresholds; + this.requiredSize = bucketCountThresholds.getRequiredSize(); + this.minDocCount = bucketCountThresholds.getMinDocCount(); } /** @@ -257,6 +258,9 @@ protected InternalTerms(StreamInput in) throws IOException { order = InternalOrder.Streams.readOrder(in); requiredSize = readSize(in); minDocCount = in.readVLong(); + // shardMinDocCount and shardSize are not used on the coordinator, so they are not deserialized. We use + // CoordinatorBucketCountThresholds which will throw an exception if they are accessed. + bucketCountThresholds = new TermsAggregator.CoordinatorBucketCountThresholds(minDocCount, -1, requiredSize, getShardSize()); } @Override @@ -385,6 +389,7 @@ private List reduceLegacy(List aggregations, ReduceConte } public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { + LocalBucketCountThresholds localBucketCountThresholds = reduceContext.asLocalBucketCountThresholds(bucketCountThresholds); long sumDocCountError = 0; long otherDocCount = 0; InternalTerms referenceTerms = null; @@ -444,8 +449,8 @@ public InternalAggregation reduce(List aggregations, Reduce reducedBuckets = reduceLegacy(aggregations, reduceContext); } final B[] list; - if (reduceContext.isFinalReduce()) { - final int size = Math.min(requiredSize, reducedBuckets.size()); + if (reduceContext.isFinalReduce() || reduceContext.isSliceLevel()) { + final int size = Math.min(localBucketCountThresholds.getRequiredSize(), reducedBuckets.size()); // final comparator final BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, order.comparator()); for (B bucket : reducedBuckets) { @@ -455,7 +460,7 @@ public InternalAggregation reduce(List aggregations, Reduce final long finalSumDocCountError = sumDocCountError; bucket.setDocCountError(docCountError -> docCountError + finalSumDocCountError); } - if (bucket.getDocCount() >= minDocCount) { + if (bucket.getDocCount() >= localBucketCountThresholds.getMinDocCount()) { B removed = ordered.insertWithOverflow(bucket); if (removed != null) { otherDocCount += removed.getDocCount(); @@ -474,7 +479,9 @@ public InternalAggregation reduce(List aggregations, Reduce } else { // we can prune the list on partial reduce if the aggregation is ordered by key // and not filtered (minDocCount == 0) - int size = isKeyOrder(order) && minDocCount == 0 ? Math.min(requiredSize, reducedBuckets.size()) : reducedBuckets.size(); + int size = isKeyOrder(order) && localBucketCountThresholds.getMinDocCount() == 0 + ? Math.min(localBucketCountThresholds.getRequiredSize(), reducedBuckets.size()) + : reducedBuckets.size(); list = createBucketsArray(size); for (int i = 0; i < size; i++) { reduceContext.consumeBucketsAndMaybeBreak(1); @@ -493,6 +500,11 @@ public InternalAggregation reduce(List aggregations, Reduce } else { docCountError = aggregations.size() == 1 ? 0 : sumDocCountError; } + + // Shards must return buckets sorted by key, so we apply the sort here in shard level reduce + if (reduceContext.isSliceLevel()) { + Arrays.sort(list, thisReduceOrder.comparator()); + } return create(name, Arrays.asList(list), reduceContext.isFinalReduce() ? order : thisReduceOrder, docCountError, otherDocCount); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/LongTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/LongTerms.java index 67aa80d0a9879..fe78145dce3e7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/LongTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/LongTerms.java @@ -142,29 +142,27 @@ public LongTerms( String name, BucketOrder reduceOrder, BucketOrder order, - int requiredSize, - long minDocCount, Map metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount, List buckets, - long docCountError + long docCountError, + TermsAggregator.BucketCountThresholds bucketCountThresholds ) { super( name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -186,15 +184,14 @@ public LongTerms create(List buckets) { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -216,15 +213,14 @@ protected LongTerms create(String name, List buckets, BucketOrder reduce name, reduceOrder, order, - requiredSize, - minDocCount, getMetadata(), format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -293,15 +289,14 @@ static DoubleTerms convertLongTermsToDouble(LongTerms longTerms, DocValueFormat longTerms.getName(), longTerms.reduceOrder, longTerms.order, - longTerms.requiredSize, - longTerms.minDocCount, longTerms.metadata, longTerms.format, longTerms.shardSize, longTerms.showTermDocCountError, longTerms.otherDocCount, newBuckets, - longTerms.docCountError + longTerms.docCountError, + longTerms.bucketCountThresholds ); } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java index bcdf1f4480a31..b0d2194cccc84 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java @@ -50,6 +50,7 @@ import org.opensearch.search.aggregations.InternalOrder; import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.LeafBucketCollectorBase; +import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes; import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; import org.opensearch.search.aggregations.support.ValuesSource; @@ -244,11 +245,12 @@ abstract class ResultStrategy ordered = buildPriorityQueue(size); B spare = null; @@ -257,7 +259,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); otherDocCounts[ordIdx] += docCount; - if (docCount < bucketCountThresholds.getShardMinDocCount()) { + if (docCount < localBucketCountThresholds.getMinDocCount()) { continue; } if (spare == null) { @@ -454,15 +456,14 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu name, reduceOrder, order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, Arrays.asList(topBuckets), - 0 + 0, + bucketCountThresholds ); } @@ -572,14 +573,13 @@ void buildSubAggs(SignificantStringTerms.Bucket[][] topBucketsPerOrd) throws IOE SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, SignificantStringTerms.Bucket[] topBuckets) { return new SignificantStringTerms( name, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, subsetSizes.get(owningBucketOrd), supersetSize, significanceHeuristic, - Arrays.asList(topBuckets) + Arrays.asList(topBuckets), + bucketCountThresholds ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java index 9d99c0b90a075..c86302844b730 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java @@ -33,6 +33,7 @@ import org.opensearch.search.aggregations.InternalOrder; import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.bucket.DeferableBucketAggregator; +import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; import org.opensearch.search.aggregations.support.AggregationPath; import org.opensearch.search.aggregations.support.ValuesSource; import org.opensearch.search.internal.SearchContext; @@ -118,13 +119,14 @@ public MultiTermsAggregator( @Override public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + LocalBucketCountThresholds localBucketCountThresholds = context.asLocalBucketCountThresholds(bucketCountThresholds); InternalMultiTerms.Bucket[][] topBucketsPerOrd = new InternalMultiTerms.Bucket[owningBucketOrds.length][]; long[] otherDocCounts = new long[owningBucketOrds.length]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]); long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); - int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); + int size = (int) Math.min(bucketsInOrd, localBucketCountThresholds.getRequiredSize()); PriorityQueue ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); InternalMultiTerms.Bucket spare = null; BytesRef dest = null; @@ -136,7 +138,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); otherDocCounts[ordIdx] += docCount; - if (docCount < bucketCountThresholds.getShardMinDocCount()) { + if (docCount < localBucketCountThresholds.getMinDocCount()) { continue; } if (spare == null) { @@ -182,15 +184,14 @@ InternalMultiTerms buildResult(long owningBucketOrd, long otherDocCount, Interna name, reduceOrder, order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, 0, formats, - List.of(topBuckets) + List.of(topBuckets), + bucketCountThresholds ); } @@ -200,15 +201,14 @@ public InternalAggregation buildEmptyAggregation() { name, order, order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), bucketCountThresholds.getShardSize(), showTermDocCountError, 0, 0, formats, - Collections.emptyList() + Collections.emptyList(), + bucketCountThresholds ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index a0265135fe9d3..8bab7ffbbb90f 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -52,6 +52,7 @@ import org.opensearch.search.aggregations.InternalOrder; import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.LeafBucketCollectorBase; +import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; import org.opensearch.search.aggregations.bucket.terms.IncludeExclude.LongFilter; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum; import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForLong; @@ -173,13 +174,14 @@ abstract class ResultStrategy ordered = buildPriorityQueue(size); B spare = null; BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); @@ -187,7 +189,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); otherDocCounts[ordIdx] += docCount; - if (docCount < bucketCountThresholds.getShardMinDocCount()) { + if (docCount < localBucketCountThresholds.getMinDocCount()) { continue; } if (spare == null) { @@ -395,15 +397,14 @@ LongTerms buildResult(long owningBucketOrd, long otherDocCount, LongTerms.Bucket name, reduceOrder, order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, List.of(topBuckets), - 0 + 0, + bucketCountThresholds ); } @@ -413,15 +414,14 @@ LongTerms buildEmptyResult() { name, order, order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), - 0 + 0, + bucketCountThresholds ); } } @@ -477,15 +477,14 @@ DoubleTerms buildResult(long owningBucketOrd, long otherDocCount, DoubleTerms.Bu name, reduceOrder, order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, List.of(topBuckets), - 0 + 0, + bucketCountThresholds ); } @@ -495,15 +494,14 @@ DoubleTerms buildEmptyResult() { name, order, order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), - 0 + 0, + bucketCountThresholds ); } } @@ -558,15 +556,14 @@ UnsignedLongTerms buildResult(long owningBucketOrd, long otherDocCount, Unsigned name, reduceOrder, order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, List.of(topBuckets), - 0 + 0, + bucketCountThresholds ); } @@ -576,15 +573,14 @@ UnsignedLongTerms buildEmptyResult() { name, order, order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), - 0 + 0, + bucketCountThresholds ); } } @@ -670,17 +666,17 @@ void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException {} @Override SignificantLongTerms buildResult(long owningBucketOrd, long otherDocCoun, SignificantLongTerms.Bucket[] topBuckets) { - return new SignificantLongTerms( + SignificantLongTerms significantLongTerms = new SignificantLongTerms( name, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, subsetSizes.get(owningBucketOrd), supersetSize, significanceHeuristic, - List.of(topBuckets) + List.of(topBuckets), + bucketCountThresholds ); + return significantLongTerms; } @Override @@ -691,14 +687,13 @@ SignificantLongTerms buildEmptyResult() { int supersetSize = topReader.numDocs(); return new SignificantLongTerms( name, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), metadata(), format, 0, supersetSize, significanceHeuristic, - emptyList() + emptyList(), + bucketCountThresholds ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantLongTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantLongTerms.java index 46e8cea7abc36..3da5a766fc37b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantLongTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantLongTerms.java @@ -130,16 +130,15 @@ public int hashCode() { public SignificantLongTerms( String name, - int requiredSize, - long minDocCount, Map metadata, DocValueFormat format, long subsetSize, long supersetSize, SignificanceHeuristic significanceHeuristic, - List buckets + List buckets, + TermsAggregator.BucketCountThresholds bucketCountThresholds ) { - super(name, requiredSize, minDocCount, metadata, format, subsetSize, supersetSize, significanceHeuristic, buckets); + super(name, metadata, format, subsetSize, supersetSize, significanceHeuristic, buckets, bucketCountThresholds); } /** @@ -158,14 +157,13 @@ public String getWriteableName() { public SignificantLongTerms create(List buckets) { return new SignificantLongTerms( name, - requiredSize, - minDocCount, metadata, format, subsetSize, supersetSize, significanceHeuristic, - buckets + buckets, + bucketCountThresholds ); } @@ -187,14 +185,13 @@ public Bucket createBucket(InternalAggregations aggregations, SignificantLongTer protected SignificantLongTerms create(long subsetSize, long supersetSize, List buckets) { return new SignificantLongTerms( getName(), - requiredSize, - minDocCount, getMetadata(), format, subsetSize, supersetSize, significanceHeuristic, - buckets + buckets, + bucketCountThresholds ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantStringTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantStringTerms.java index d8d93ad7ae159..c70db6005d7cd 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantStringTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantStringTerms.java @@ -135,16 +135,15 @@ public int hashCode() { public SignificantStringTerms( String name, - int requiredSize, - long minDocCount, Map metadata, DocValueFormat format, long subsetSize, long supersetSize, SignificanceHeuristic significanceHeuristic, - List buckets + List buckets, + TermsAggregator.BucketCountThresholds bucketCountThresholds ) { - super(name, requiredSize, minDocCount, metadata, format, subsetSize, supersetSize, significanceHeuristic, buckets); + super(name, metadata, format, subsetSize, supersetSize, significanceHeuristic, buckets, bucketCountThresholds); } /** @@ -163,14 +162,13 @@ public String getWriteableName() { public SignificantStringTerms create(List buckets) { return new SignificantStringTerms( name, - requiredSize, - minDocCount, metadata, format, subsetSize, supersetSize, significanceHeuristic, - buckets + buckets, + bucketCountThresholds ); } @@ -192,14 +190,13 @@ public Bucket createBucket(InternalAggregations aggregations, SignificantStringT protected SignificantStringTerms create(long subsetSize, long supersetSize, List buckets) { return new SignificantStringTerms( getName(), - requiredSize, - minDocCount, getMetadata(), format, subsetSize, supersetSize, significanceHeuristic, - buckets + buckets, + bucketCountThresholds ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java index 1dacd4c7de4e8..9f5136dc1df53 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java @@ -246,12 +246,7 @@ public Aggregator build( @Override protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map metadata) throws IOException { - final InternalAggregation aggregation = new UnmappedSignificantTerms( - name, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), - metadata - ); + final InternalAggregation aggregation = new UnmappedSignificantTerms(name, bucketCountThresholds, metadata); return new NonCollectingAggregator(name, searchContext, parent, factories, metadata) { @Override public InternalAggregation buildEmptyAggregation() { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StringTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StringTerms.java index c985bf770d4a7..6dedc65ff14e3 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StringTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StringTerms.java @@ -134,29 +134,27 @@ public StringTerms( String name, BucketOrder reduceOrder, BucketOrder order, - int requiredSize, - long minDocCount, Map metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount, List buckets, - long docCountError + long docCountError, + TermsAggregator.BucketCountThresholds bucketCountThresholds ) { super( name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -178,15 +176,14 @@ public StringTerms create(List buckets) { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -213,15 +210,14 @@ protected StringTerms create(String name, List buckets, BucketOrder redu name, reduceOrder, order, - requiredSize, - minDocCount, getMetadata(), format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregator.java index 9e2aa85bb1dd8..7cacf1e918380 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregator.java @@ -39,6 +39,7 @@ import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.search.DocValueFormat; +import org.opensearch.search.aggregations.AggregationExecutionException; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.BucketOrder; @@ -195,6 +196,30 @@ public boolean equals(Object obj) { } } + /** + * BucketCountThresholds type that throws an exception when shardMinDocCount or shardSize are accessed. This is used for + * deserialization on the coordinator during reduce as shardMinDocCount and shardSize should not be accessed this way on the + * coordinator. + * + * @opensearch.internal + */ + public static class CoordinatorBucketCountThresholds extends BucketCountThresholds { + + public CoordinatorBucketCountThresholds(long minDocCount, long shardMinDocCount, int requiredSize, int shardSize) { + super(minDocCount, shardMinDocCount, requiredSize, shardSize); + } + + @Override + public long getShardMinDocCount() { + throw new AggregationExecutionException("shard_min_doc_count should not be accessed via CoordinatorBucketCountThresholds"); + } + + @Override + public int getShardSize() { + throw new AggregationExecutionException("shard_size should not be accessed via CoordinatorBucketCountThresholds"); + } + } + protected final DocValueFormat format; protected final BucketCountThresholds bucketCountThresholds; protected final BucketOrder order; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index 11be3da5c8991..62844b4499dba 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -265,13 +265,7 @@ public Aggregator build( @Override protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map metadata) throws IOException { - final InternalAggregation aggregation = new UnmappedTerms( - name, - order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), - metadata - ); + final InternalAggregation aggregation = new UnmappedTerms(name, order, bucketCountThresholds, metadata); Aggregator agg = new NonCollectingAggregator(name, searchContext, parent, factories, metadata) { @Override public InternalAggregation buildEmptyAggregation() { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnmappedSignificantTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnmappedSignificantTerms.java index 9384f9e793d81..2c8aa8f0a0c37 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnmappedSignificantTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnmappedSignificantTerms.java @@ -77,8 +77,12 @@ private Bucket( } } - public UnmappedSignificantTerms(String name, int requiredSize, long minDocCount, Map metadata) { - super(name, requiredSize, minDocCount, metadata); + public UnmappedSignificantTerms( + String name, + TermsAggregator.BucketCountThresholds bucketCountThresholds, + Map metadata + ) { + super(name, bucketCountThresholds, metadata); } /** @@ -105,7 +109,7 @@ public String getType() { @Override public UnmappedSignificantTerms create(List buckets) { - return new UnmappedSignificantTerms(name, requiredSize, minDocCount, metadata); + return new UnmappedSignificantTerms(name, bucketCountThresholds, metadata); } @Override @@ -132,7 +136,7 @@ Bucket createBucket( @Override public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { - return new UnmappedSignificantTerms(name, requiredSize, minDocCount, metadata); + return new UnmappedSignificantTerms(name, bucketCountThresholds, metadata); } @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnmappedTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnmappedTerms.java index 01902f9449bae..3d2bbb93c889a 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnmappedTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnmappedTerms.java @@ -72,8 +72,13 @@ private Bucket( } } - public UnmappedTerms(String name, BucketOrder order, int requiredSize, long minDocCount, Map metadata) { - super(name, order, order, requiredSize, minDocCount, metadata); + public UnmappedTerms( + String name, + BucketOrder order, + TermsAggregator.BucketCountThresholds bucketCountThresholds, + Map metadata + ) { + super(name, order, order, bucketCountThresholds, metadata); } /** @@ -100,7 +105,7 @@ public String getType() { @Override public UnmappedTerms create(List buckets) { - return new UnmappedTerms(name, order, requiredSize, minDocCount, metadata); + return new UnmappedTerms(name, order, bucketCountThresholds, metadata); } @Override @@ -120,7 +125,7 @@ protected UnmappedTerms create(String name, List buckets, BucketOrder re @Override public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { - return new UnmappedTerms(name, order, requiredSize, minDocCount, metadata); + return new UnmappedTerms(name, order, bucketCountThresholds, metadata); } @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTerms.java index db05ac84b4aec..edeec00d366fd 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTerms.java @@ -121,29 +121,27 @@ public UnsignedLongTerms( String name, BucketOrder reduceOrder, BucketOrder order, - int requiredSize, - long minDocCount, Map metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount, List buckets, - long docCountError + long docCountError, + TermsAggregator.BucketCountThresholds bucketCountThresholds ) { super( name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -165,15 +163,14 @@ public UnsignedLongTerms create(List buckets) { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -195,15 +192,14 @@ protected UnsignedLongTerms create(String name, List buckets, BucketOrde name, reduceOrder, order, - requiredSize, - minDocCount, getMetadata(), format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -272,15 +268,14 @@ static DoubleTerms convertUnsignedLongTermsToDouble(UnsignedLongTerms unsignedLo unsignedLongTerms.getName(), unsignedLongTerms.reduceOrder, unsignedLongTerms.order, - unsignedLongTerms.requiredSize, - unsignedLongTerms.minDocCount, unsignedLongTerms.metadata, unsignedLongTerms.format, unsignedLongTerms.shardSize, unsignedLongTerms.showTermDocCountError, unsignedLongTerms.otherDocCount, newBuckets, - unsignedLongTerms.docCountError + unsignedLongTerms.docCountError, + unsignedLongTerms.bucketCountThresholds ); } } diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index bc2a0658e5a6d..704683a59c473 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -35,6 +35,7 @@ import org.apache.lucene.search.CollectorManager; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.Query; +import org.apache.lucene.util.ArrayUtil; import org.opensearch.action.search.SearchShardTask; import org.opensearch.action.search.SearchType; import org.opensearch.common.Nullable; @@ -57,6 +58,8 @@ import org.opensearch.search.aggregations.BucketCollectorProcessor; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.SearchContextAggregations; +import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; import org.opensearch.search.collapse.CollapseContext; import org.opensearch.search.dfs.DfsSearchResult; import org.opensearch.search.fetch.FetchPhase; @@ -400,6 +403,17 @@ public boolean isConcurrentSegmentSearchEnabled() { return false; } + /** + * Returns local bucket count thresholds based on concurrent segment search status + */ + public LocalBucketCountThresholds asLocalBucketCountThresholds(TermsAggregator.BucketCountThresholds bucketCountThresholds) { + if (isConcurrentSegmentSearchEnabled()) { + return new LocalBucketCountThresholds(0, ArrayUtil.MAX_ARRAY_LENGTH - 1); + } else { + return new LocalBucketCountThresholds(bucketCountThresholds.getShardMinDocCount(), bucketCountThresholds.getShardSize()); + } + } + /** * Adds a releasable that will be freed when this context is closed. */ diff --git a/server/src/test/java/org/opensearch/search/aggregations/InternalAggregationsTests.java b/server/src/test/java/org/opensearch/search/aggregations/InternalAggregationsTests.java index 354c635e06fab..59bc90788a1e7 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/InternalAggregationsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/InternalAggregationsTests.java @@ -43,6 +43,7 @@ import org.opensearch.search.aggregations.bucket.histogram.InternalDateHistogramTests; import org.opensearch.search.aggregations.bucket.terms.StringTerms; import org.opensearch.search.aggregations.bucket.terms.StringTermsTests; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; import org.opensearch.search.aggregations.pipeline.InternalSimpleValueTests; import org.opensearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; @@ -76,15 +77,14 @@ public void testNonFinalReduceTopLevelPipelineAggs() { "name", BucketOrder.key(true), BucketOrder.key(true), - 10, - 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), - 0 + 0, + new TermsAggregator.BucketCountThresholds(1, 0, 10, 25) ); List aggs = singletonList(InternalAggregations.from(Collections.singletonList(terms))); InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, maxBucketReduceContext().forPartialReduction()); @@ -96,15 +96,14 @@ public void testFinalReduceTopLevelPipelineAggs() { "name", BucketOrder.key(true), BucketOrder.key(true), - 10, - 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), - 0 + 0, + new TermsAggregator.BucketCountThresholds(1, 0, 10, 25) ); InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(terms)); diff --git a/server/src/test/java/org/opensearch/search/aggregations/InternalMultiBucketAggregationTests.java b/server/src/test/java/org/opensearch/search/aggregations/InternalMultiBucketAggregationTests.java index bc48b546e6d28..b7f4094da9990 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/InternalMultiBucketAggregationTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/InternalMultiBucketAggregationTests.java @@ -37,6 +37,7 @@ import org.opensearch.search.aggregations.bucket.terms.InternalTerms; import org.opensearch.search.aggregations.bucket.terms.LongTerms; import org.opensearch.search.aggregations.bucket.terms.StringTerms; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregator; import org.opensearch.search.aggregations.metrics.InternalAvg; import org.opensearch.search.aggregations.support.AggregationPath; import org.opensearch.test.OpenSearchTestCase; @@ -164,20 +165,18 @@ public void testResolveToSpecificBucket() { DocValueFormat.RAW ) ); - InternalTerms termsAgg = new StringTerms( "string_terms", BucketOrder.count(false), BucketOrder.count(false), - 1, - 0, Collections.emptyMap(), DocValueFormat.RAW, 1, false, 0, stringBuckets, - 0 + 0, + new TermsAggregator.BucketCountThresholds(0, 0, 1, 1) ); InternalAggregations internalAggregations = InternalAggregations.from(Collections.singletonList(termsAgg)); LongTerms.Bucket bucket = new LongTerms.Bucket(19, 1, internalAggregations, false, 0, DocValueFormat.RAW); @@ -208,15 +207,14 @@ public void testResolveToMissingSpecificBucket() { "string_terms", BucketOrder.count(false), BucketOrder.count(false), - 1, - 0, Collections.emptyMap(), DocValueFormat.RAW, 1, false, 0, stringBuckets, - 0 + 0, + new TermsAggregator.BucketCountThresholds(0, 0, 1, 1) ); InternalAggregations internalAggregations = InternalAggregations.from(Collections.singletonList(termsAgg)); LongTerms.Bucket bucket = new LongTerms.Bucket(19, 1, internalAggregations, false, 0, DocValueFormat.RAW); diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/DoubleTermsTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/DoubleTermsTests.java index 853d56202c360..5fe9c1dee358d 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/DoubleTermsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/DoubleTermsTests.java @@ -59,6 +59,12 @@ public class DoubleTermsTests extends InternalTermsTestCase { long minDocCount = 1; int requiredSize = 3; int shardSize = requiredSize + 2; + TermsAggregator.BucketCountThresholds bucketCountThresholds = new TermsAggregator.BucketCountThresholds( + minDocCount, + 0, + requiredSize, + shardSize + ); DocValueFormat format = randomNumericDocValueFormat(); long otherDocCount = 0; List buckets = new ArrayList<>(); @@ -75,15 +81,14 @@ public class DoubleTermsTests extends InternalTermsTestCase { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -158,15 +163,14 @@ protected Class implementationClass() { name, doubleTerms.reduceOrder, order, - requiredSize, - minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, shardSize) ); } else { String name = instance.getName(); @@ -195,7 +199,7 @@ protected Class implementationClass() { default: throw new AssertionError("Illegal randomisation branch"); } - return new UnmappedTerms(name, order, requiredSize, minDocCount, metadata); + return new UnmappedTerms(name, order, new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, 0), metadata); } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTermsTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTermsTests.java index 2657f2bdd5138..9f8bab1179ad6 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTermsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalMultiTermsTests.java @@ -45,6 +45,12 @@ public class InternalMultiTermsTests extends InternalTermsTestCase { int requiredSize = 3; int shardSize = requiredSize + 2; long otherDocCount = 0; + TermsAggregator.BucketCountThresholds bucketCountThresholds = new TermsAggregator.BucketCountThresholds( + minDocCount, + 0, + requiredSize, + shardSize + ); final int numBuckets = randomNumberOfBuckets(); @@ -70,15 +76,14 @@ public class InternalMultiTermsTests extends InternalTermsTestCase { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, shardSize, showTermDocCountError, otherDocCount, docCountError, formats, - buckets + buckets, + bucketCountThresholds ); } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTermsTestCase.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTermsTestCase.java index aab9e91576b18..e640aa92ac782 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTermsTestCase.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTermsTestCase.java @@ -114,7 +114,7 @@ protected abstract InternalSignificantTerms createTestInstance( @Override protected InternalSignificantTerms createUnmappedInstance(String name, Map metadata) { InternalSignificantTerms testInstance = createTestInstance(name, metadata); - return new UnmappedSignificantTerms(name, testInstance.requiredSize, testInstance.minDocCount, metadata); + return new UnmappedSignificantTerms(name, testInstance.bucketCountThresholds, metadata); } @Override diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalTermsTestCase.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalTermsTestCase.java index d3f7c62021243..2e00248a70771 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalTermsTestCase.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/InternalTermsTestCase.java @@ -73,7 +73,7 @@ public void init() { @Override protected InternalTerms createUnmappedInstance(String name, Map metadata) { InternalTerms testInstance = createTestInstance(name, metadata); - return new UnmappedTerms(name, testInstance.order, testInstance.requiredSize, testInstance.minDocCount, metadata); + return new UnmappedTerms(name, testInstance.order, testInstance.bucketCountThresholds, metadata); } @Override diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/LongTermsTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/LongTermsTests.java index becf89c4603f3..44fa9f5e79593 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/LongTermsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/LongTermsTests.java @@ -59,6 +59,12 @@ public class LongTermsTests extends InternalTermsTestCase { long minDocCount = 1; int requiredSize = 3; int shardSize = requiredSize + 2; + TermsAggregator.BucketCountThresholds bucketCountThresholds = new TermsAggregator.BucketCountThresholds( + minDocCount, + 0, + requiredSize, + shardSize + ); DocValueFormat format = randomNumericDocValueFormat(); long otherDocCount = 0; List buckets = new ArrayList<>(); @@ -75,15 +81,14 @@ public class LongTermsTests extends InternalTermsTestCase { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -158,15 +163,14 @@ protected Class implementationClass() { name, longTerms.reduceOrder, order, - requiredSize, - minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, shardSize) ); } else { String name = instance.getName(); @@ -195,7 +199,7 @@ protected Class implementationClass() { default: throw new AssertionError("Illegal randomisation branch"); } - return new UnmappedTerms(name, order, requiredSize, minDocCount, metadata); + return new UnmappedTerms(name, order, new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, 0), metadata); } } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificanceHeuristicTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificanceHeuristicTests.java index b400f5bccca01..24aeed1aeb635 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificanceHeuristicTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificanceHeuristicTests.java @@ -125,7 +125,16 @@ public void testStreamResponse() throws Exception { DocValueFormat.RAW, randomDoubleBetween(0, 100, true) ); - return new SignificantLongTerms("some_name", 1, 1, null, DocValueFormat.RAW, 10, 20, heuristic, singletonList(bucket)); + return new SignificantLongTerms( + "some_name", + null, + DocValueFormat.RAW, + 10, + 20, + heuristic, + singletonList(bucket), + new TermsAggregator.BucketCountThresholds(1, 0, 1, 0) + ); } else { SignificantStringTerms.Bucket bucket = new SignificantStringTerms.Bucket( new BytesRef("someterm"), @@ -137,7 +146,16 @@ public void testStreamResponse() throws Exception { DocValueFormat.RAW, randomDoubleBetween(0, 100, true) ); - return new SignificantStringTerms("some_name", 1, 1, null, DocValueFormat.RAW, 10, 20, heuristic, singletonList(bucket)); + return new SignificantStringTerms( + "some_name", + null, + DocValueFormat.RAW, + 10, + 20, + heuristic, + singletonList(bucket), + new TermsAggregator.BucketCountThresholds(1, 0, 1, 0) + ); } } @@ -204,14 +222,13 @@ SignificantStringTerms createAggregation( ) { return new SignificantStringTerms( "sig_terms", - 2, - -1, emptyMap(), DocValueFormat.RAW, subsetSize, supersetSize, significanceHeuristic, - buckets + buckets, + new TermsAggregator.BucketCountThresholds(-1, 0, 2, 0) ); } @@ -240,14 +257,13 @@ SignificantLongTerms createAggregation( ) { return new SignificantLongTerms( "sig_terms", - 2, - -1, emptyMap(), DocValueFormat.RAW, subsetSize, supersetSize, significanceHeuristic, - buckets + buckets, + new TermsAggregator.BucketCountThresholds(-1, 0, 2, 0) ); } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificantLongTermsTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificantLongTermsTests.java index 886e4d8267578..38b478efd004b 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificantLongTermsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificantLongTermsTests.java @@ -85,7 +85,17 @@ protected InternalSignificantTerms createTestInstance( bucket.updateScore(significanceHeuristic); buckets.add(bucket); } - return new SignificantLongTerms(name, requiredSize, 1L, metadata, format, subsetSize, supersetSize, significanceHeuristic, buckets); + + return new SignificantLongTerms( + name, + metadata, + format, + subsetSize, + supersetSize, + significanceHeuristic, + buckets, + new TermsAggregator.BucketCountThresholds(1L, 0, requiredSize, 0) + ); } @Override @@ -150,14 +160,13 @@ protected Class implementationClass() { } return new SignificantLongTerms( name, - requiredSize, - minDocCount, metadata, format, subsetSize, supersetSize, significanceHeuristic, - buckets + buckets, + new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, 0) ); } else { String name = instance.getName(); @@ -185,7 +194,7 @@ protected Class implementationClass() { default: throw new AssertionError("Illegal randomisation branch"); } - return new UnmappedSignificantTerms(name, requiredSize, minDocCount, metadata); + return new UnmappedSignificantTerms(name, new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, 0), metadata); } } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificantStringTermsTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificantStringTermsTests.java index 63a08a7aa1683..3ac30248ef353 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificantStringTermsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/SignificantStringTermsTests.java @@ -80,14 +80,13 @@ protected InternalSignificantTerms createTestInstance( } return new SignificantStringTerms( name, - requiredSize, - 1L, metadata, format, subsetSize, supersetSize, significanceHeuristic, - buckets + buckets, + new TermsAggregator.BucketCountThresholds(1L, 0, requiredSize, 0) ); } @@ -153,14 +152,13 @@ protected Class implementationClass() { } return new SignificantStringTerms( name, - requiredSize, - minDocCount, metadata, format, subsetSize, supersetSize, significanceHeuristic, - buckets + buckets, + new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, 0) ); } else { String name = instance.getName(); @@ -188,7 +186,7 @@ protected Class implementationClass() { default: throw new AssertionError("Illegal randomisation branch"); } - return new UnmappedSignificantTerms(name, requiredSize, minDocCount, metadata); + return new UnmappedSignificantTerms(name, new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, 0), metadata); } } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StringTermsTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StringTermsTests.java index 6757c8e00f83d..deba96fd3ae19 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StringTermsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StringTermsTests.java @@ -140,15 +140,14 @@ protected Class implementationClass() { name, stringTerms.reduceOrder, order, - requiredSize, - minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, shardSize) ); } else { String name = instance.getName(); @@ -177,7 +176,7 @@ protected Class implementationClass() { default: throw new AssertionError("Illegal randomisation branch"); } - return new UnmappedTerms(name, order, requiredSize, minDocCount, metadata); + return new UnmappedTerms(name, order, new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, 0), metadata); } } @@ -206,6 +205,12 @@ private BytesRef[] generateRandomDict() { long minDocCount = 1; int requiredSize = 3; int shardSize = requiredSize + 2; + TermsAggregator.BucketCountThresholds bucketCountThresholds = new TermsAggregator.BucketCountThresholds( + minDocCount, + 0, + requiredSize, + shardSize + ); DocValueFormat format = DocValueFormat.RAW; long otherDocCount = 0; List buckets = new ArrayList<>(); @@ -226,15 +231,14 @@ private BytesRef[] generateRandomDict() { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTermsTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTermsTests.java index b961039e50501..478961c2a404c 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTermsTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/UnsignedLongTermsTests.java @@ -36,6 +36,12 @@ public class UnsignedLongTermsTests extends InternalTermsTestCase { long minDocCount = 1; int requiredSize = 3; int shardSize = requiredSize + 2; + TermsAggregator.BucketCountThresholds bucketCountThresholds = new TermsAggregator.BucketCountThresholds( + minDocCount, + 0, + requiredSize, + shardSize + ); DocValueFormat format = randomNumericDocValueFormat(); long otherDocCount = 0; List buckets = new ArrayList<>(); @@ -52,15 +58,14 @@ public class UnsignedLongTermsTests extends InternalTermsTestCase { name, reduceOrder, order, - requiredSize, - minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + bucketCountThresholds ); } @@ -135,15 +140,14 @@ protected Class implementationClass() { name, longTerms.reduceOrder, order, - requiredSize, - minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, - docCountError + docCountError, + new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, shardSize) ); } else { String name = instance.getName(); @@ -172,7 +176,7 @@ protected Class implementationClass() { default: throw new AssertionError("Illegal randomisation branch"); } - return new UnmappedTerms(name, order, requiredSize, minDocCount, metadata); + return new UnmappedTerms(name, order, new TermsAggregator.BucketCountThresholds(minDocCount, 0, requiredSize, 0), metadata); } } } diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index 874d60a4097f2..27c406b019c77 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -350,6 +350,7 @@ public boolean shouldCache(Query query) { when(searchContext.aggregations()).thenReturn(new SearchContextAggregations(AggregatorFactories.EMPTY, bucketConsumer)); when(searchContext.query()).thenReturn(query); when(searchContext.bucketCollectorProcessor()).thenReturn(new BucketCollectorProcessor()); + when(searchContext.asLocalBucketCountThresholds(any())).thenCallRealMethod(); /* * Always use the circuit breaking big arrays instance so that the CircuitBreakerService * we're passed gets a chance to break.