From 6cf52cc14853bd031ccc2c980e1e2f164ae3bfcc Mon Sep 17 00:00:00 2001 From: Sorabh Date: Thu, 13 Jul 2023 10:08:52 -0700 Subject: [PATCH] CardinalityIT/NestedIT test failures with concurrent search enabled and AssertingCodec (#8303) * CardinalityIT/NestedIT test failures with concurrent search enabled and AssertingCodec The tests were failing because during the concurrent segment search for each slice the codec producers for the leafs were initialized by the slice thread. Later in reduce phase, the post collection happens over those codec producers on the search thread. With AssertingCodec it verifies that all access is done by the same thread causing the failures Signed-off-by: Sorabh Hamirwasia * Address review comments Signed-off-by: Sorabh Hamirwasia --------- Signed-off-by: Sorabh Hamirwasia --- CHANGELOG.md | 3 + .../search/DefaultSearchContext.java | 13 ++- .../AggregationCollectorManager.java | 26 +---- .../BucketCollectorProcessor.java | 105 ++++++++++++++++++ .../ConcurrentAggregationProcessor.java | 6 +- .../DefaultAggregationProcessor.java | 4 + .../search/internal/ContextIndexSearcher.java | 1 + .../internal/FilteredSearchContext.java | 11 ++ .../search/internal/SearchContext.java | 22 ++++ .../search/SearchCancellationTests.java | 13 ++- .../internal/ContextIndexSearcherTests.java | 1 + .../profile/query/QueryProfilerTests.java | 1 + .../search/query/QueryPhaseTests.java | 2 + .../search/query/QueryProfilePhaseTests.java | 2 + .../aggregations/AggregatorTestCase.java | 1 + .../opensearch/test/TestSearchContext.java | 12 ++ 16 files changed, 194 insertions(+), 29 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 87e4e75b43ae0..e5a5a601fa169 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -173,6 +173,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Enabling compression levels for zstd and zstd_no_dict ([#8312](https://github.com/opensearch-project/OpenSearch/pull/8312)) - Optimize Metadata build() to skip redundant computations as part of ClusterState build ([#7853](https://github.com/opensearch-project/OpenSearch/pull/7853)) - Add safeguard limits for file cache during node level allocation ([#8208](https://github.com/opensearch-project/OpenSearch/pull/8208)) +- Move span actions to Scope ([#8411](https://github.com/opensearch-project/OpenSearch/pull/8411)) +- Add wrapper tracer implementation ([#8565](https://github.com/opensearch-project/OpenSearch/pull/8565)) +- Perform aggregation postCollection in ContextIndexSearcher after searching leaves ([#8303](https://github.com/opensearch-project/OpenSearch/pull/8303)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index e83bfe8486904..ee29d6bfe2b62 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -65,6 +65,7 @@ import org.opensearch.index.search.NestedHelper; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.similarity.SimilarityService; +import org.opensearch.search.aggregations.BucketCollectorProcessor; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.builder.SearchSourceBuilder; @@ -176,7 +177,7 @@ final class DefaultSearchContext extends SearchContext { private SuggestionSearchContext suggest; private List rescore; private Profilers profilers; - + private BucketCollectorProcessor bucketCollectorProcessor = NO_OP_BUCKET_COLLECTOR_PROCESSOR; private final Map searchExtBuilders = new HashMap<>(); private final Map, CollectorManager> queryCollectorManagers = new HashMap<>(); private final QueryShardContext queryShardContext; @@ -919,4 +920,14 @@ public ReaderContext readerContext() { public InternalAggregation.ReduceContext partial() { return requestToAggReduceContextBuilder.apply(request.source()).forPartialReduction(); } + + @Override + public void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor) { + this.bucketCollectorProcessor = bucketCollectorProcessor; + } + + @Override + public BucketCollectorProcessor bucketCollectorProcessor() { + return bucketCollectorProcessor; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java index 03519b335bbea..0b36fc8b0cc5a 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java @@ -17,11 +17,8 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Deque; -import java.util.LinkedList; import java.util.List; /** @@ -53,31 +50,12 @@ public Collector newCollector() throws IOException { @Override public ReduceableSearchResult reduce(Collection collectors) throws IOException { - List aggregators = new ArrayList<>(); - - final Deque allCollectors = new LinkedList<>(collectors); - while (!allCollectors.isEmpty()) { - final Collector currentCollector = allCollectors.pop(); - if (currentCollector instanceof Aggregator) { - aggregators.add((Aggregator) currentCollector); - } else if (currentCollector instanceof InternalProfileCollector) { - if (((InternalProfileCollector) currentCollector).getCollector() instanceof Aggregator) { - aggregators.add((Aggregator) ((InternalProfileCollector) currentCollector).getCollector()); - } else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) { - allCollectors.addAll( - Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors()) - ); - } - } else if (currentCollector instanceof MultiBucketCollector) { - allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors())); - } - } - + final List aggregators = context.bucketCollectorProcessor().toAggregators(collectors); final List internals = new ArrayList<>(aggregators.size()); context.aggregations().resetBucketMultiConsumer(); for (Aggregator aggregator : aggregators) { try { - aggregator.postCollection(); + // post collection is called in ContextIndexSearcher after search on leaves are completed internals.add(aggregator.buildTopLevel()); } catch (IOException e) { throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e); diff --git a/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java b/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java new file mode 100644 index 0000000000000..352ecf8bc94ad --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java @@ -0,0 +1,105 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations; + +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.MultiCollector; +import org.opensearch.common.lucene.MinimumScoreCollector; +import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.profile.query.InternalProfileCollector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +/** + * Processor to perform collector level processing specific to {@link BucketCollector} in different stages like: a) PostCollection + * after search on each leaf is completed and b) process the collectors to perform reduce after collection is completed + */ +public class BucketCollectorProcessor { + + /** + * Performs {@link BucketCollector#postCollection()} on all the {@link BucketCollector} in the given {@link Collector} collector tree + * after the collection of documents on a leaf is completed. This method will be called by different slice threads on its own collector + * tree instance in case of concurrent segment search such that postCollection happens on the same slice thread which initialize and + * perform collection of the documents for a leaf segment. For sequential search case, there is always a single search thread which + * performs both collection and postCollection on {@link BucketCollector}. + *

+ * This was originally done in {@link org.opensearch.search.aggregations.AggregationProcessor#postProcess(SearchContext)}. But with + * concurrent segment search path this needs to be performed here. There are AssertingCodecs in lucene which validates that the + * DocValues created for a field is always used by the same thread for a request. In concurrent segment search case, the DocValues + * gets initialized on different threads for different segments (or slices). Whereas the postProcess happens as part of reduce phase + * and is performed on the separate thread which is from search threadpool and not from slice threadpool. So two different threads + * performs the access on the DocValues causing the AssertingCodec to fail. From functionality perspective, there is no issue as + * DocValues for each segment is always accessed by a single thread at a time but those threads may be different (e.g. slice thread + * during collection and then search thread during reduce) + *

+ *

+ * NOTE: We can evaluate and deprecate this postCollection processing once lucene release the changes described in the + * issue-12375. With this new change we should be able to implement + * {@link BucketCollector#postCollection()} functionality using the lucene interface directly such that postCollection gets called + * from the slice thread by lucene itself + *

+ * @param collectorTree collector tree used by calling thread + */ + public void processPostCollection(Collector collectorTree) throws IOException { + final Queue collectors = new LinkedList<>(); + collectors.offer(collectorTree); + while (!collectors.isEmpty()) { + Collector currentCollector = collectors.poll(); + if (currentCollector instanceof InternalProfileCollector) { + collectors.offer(((InternalProfileCollector) currentCollector).getCollector()); + } else if (currentCollector instanceof MinimumScoreCollector) { + collectors.offer(((MinimumScoreCollector) currentCollector).getCollector()); + } else if (currentCollector instanceof MultiCollector) { + for (Collector innerCollector : ((MultiCollector) currentCollector).getCollectors()) { + collectors.offer(innerCollector); + } + } else if (currentCollector instanceof BucketCollector) { + ((BucketCollector) currentCollector).postCollection(); + } + } + } + + /** + * Unwraps the input collection of {@link Collector} to get the list of the {@link Aggregator} used by different slice threads. The + * input is expected to contain the collectors related to Aggregations only as that is passed to {@link AggregationCollectorManager} + * during the reduce phase. This list of {@link Aggregator} is used to create {@link InternalAggregation} and optionally perform + * reduce at shard level before returning response to coordinator + * @param collectors collection of aggregation collectors to reduce + * @return list of unwrapped {@link Aggregator} + */ + public List toAggregators(Collection collectors) { + List aggregators = new ArrayList<>(); + + final Deque allCollectors = new LinkedList<>(collectors); + while (!allCollectors.isEmpty()) { + final Collector currentCollector = allCollectors.pop(); + if (currentCollector instanceof Aggregator) { + aggregators.add((Aggregator) currentCollector); + } else if (currentCollector instanceof InternalProfileCollector) { + if (((InternalProfileCollector) currentCollector).getCollector() instanceof Aggregator) { + aggregators.add((Aggregator) ((InternalProfileCollector) currentCollector).getCollector()); + } else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) { + allCollectors.addAll( + Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors()) + ); + } + } else if (currentCollector instanceof MultiBucketCollector) { + allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors())); + } + } + return aggregators; + } +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java b/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java index 592fb8cc6e674..336ad8739eb41 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java +++ b/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java @@ -28,12 +28,16 @@ * avoid the increase in aggregation result sets returned by each shard to coordinator where final reduce happens for results received from * all the shards */ -public class ConcurrentAggregationProcessor extends DefaultAggregationProcessor { +public class ConcurrentAggregationProcessor implements AggregationProcessor { + + private final BucketCollectorProcessor bucketCollectorProcessor = new BucketCollectorProcessor(); @Override public void preProcess(SearchContext context) { try { if (context.aggregations() != null) { + // update the bucket collector process as there is aggregation in the request + context.setBucketCollectorProcessor(bucketCollectorProcessor); if (context.aggregations().factories().hasNonGlobalAggregator()) { context.queryCollectorManagers().put(NonGlobalAggCollectorManager.class, new NonGlobalAggCollectorManager(context)); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java b/server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java index 05aa4a9acb270..24b05ebcf3a61 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java +++ b/server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java @@ -24,10 +24,14 @@ */ public class DefaultAggregationProcessor implements AggregationProcessor { + private final BucketCollectorProcessor bucketCollectorProcessor = new BucketCollectorProcessor(); + @Override public void preProcess(SearchContext context) { try { if (context.aggregations() != null) { + // update the bucket collector process as there is aggregation in the request + context.setBucketCollectorProcessor(bucketCollectorProcessor); if (context.aggregations().factories().hasNonGlobalAggregator()) { context.queryCollectorManagers() .put(NonGlobalAggCollectorManager.class, new NonGlobalAggCollectorManagerWithSingleCollector(context)); diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index 2f1c5475c9cf6..e3ca932eb4699 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -276,6 +276,7 @@ protected void search(List leaves, Weight weight, Collector c searchLeaf(leaves.get(i), weight, collector); } } + searchContext.bucketCollectorProcessor().processPostCollection(collector); } /** diff --git a/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java b/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java index 9bfc0e8b6fea5..790d2ed5ee4b7 100644 --- a/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java @@ -50,6 +50,7 @@ import org.opensearch.index.similarity.SimilarityService; import org.opensearch.search.SearchExtBuilder; import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.aggregations.BucketCollectorProcessor; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.collapse.CollapseContext; @@ -548,4 +549,14 @@ public ReaderContext readerContext() { public InternalAggregation.ReduceContext partial() { return in.partial(); } + + @Override + public void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor) { + in.setBucketCollectorProcessor(bucketCollectorProcessor); + } + + @Override + public BucketCollectorProcessor bucketCollectorProcessor() { + return in.bucketCollectorProcessor(); + } } diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index 319a5624bbf56..fd02ba2ba12bb 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -53,6 +53,8 @@ import org.opensearch.search.RescoreDocIds; import org.opensearch.search.SearchExtBuilder; import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.aggregations.Aggregator; +import org.opensearch.search.aggregations.BucketCollectorProcessor; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.collapse.CollapseContext; @@ -73,6 +75,7 @@ import org.opensearch.search.sort.SortAndFormats; import org.opensearch.search.suggest.SuggestionSearchContext; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -94,6 +97,20 @@ public abstract class SearchContext implements Releasable { public static final int TRACK_TOTAL_HITS_DISABLED = -1; public static final int DEFAULT_TRACK_TOTAL_HITS_UP_TO = 10000; + // no-op bucket collector processor + public static final BucketCollectorProcessor NO_OP_BUCKET_COLLECTOR_PROCESSOR = new BucketCollectorProcessor() { + @Override + public void processPostCollection(Collector collectorTree) { + // do nothing as there is no aggregation collector + } + + @Override + public List toAggregators(Collection collectors) { + // should not be called when there is no aggregation collector + throw new IllegalStateException("Unexpected toAggregators call on NO_OP_BUCKET_COLLECTOR_PROCESSOR"); + } + }; + private final List releasables = new CopyOnWriteArrayList<>(); private final AtomicBoolean closed = new AtomicBoolean(false); private InnerHitsContext innerHitsContext; @@ -449,4 +466,9 @@ public String toString() { public abstract ReaderContext readerContext(); public abstract InternalAggregation.ReduceContext partial(); + + // processor used for bucket collectors + public abstract void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor); + + public abstract BucketCollectorProcessor bucketCollectorProcessor(); } diff --git a/server/src/test/java/org/opensearch/search/SearchCancellationTests.java b/server/src/test/java/org/opensearch/search/SearchCancellationTests.java index e67123bf2c51e..011723da36a30 100644 --- a/server/src/test/java/org/opensearch/search/SearchCancellationTests.java +++ b/server/src/test/java/org/opensearch/search/SearchCancellationTests.java @@ -48,6 +48,7 @@ import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.automaton.CompiledAutomaton; import org.apache.lucene.util.automaton.RegExp; +import org.junit.Before; import org.opensearch.common.util.io.IOUtils; import org.opensearch.index.shard.IndexShard; import org.opensearch.search.internal.ContextIndexSearcher; @@ -71,6 +72,7 @@ public class SearchCancellationTests extends OpenSearchTestCase { private static Directory dir; private static IndexReader reader; + private SearchContext searchContext; @BeforeClass public static void setup() throws IOException { @@ -106,6 +108,12 @@ public static void cleanup() throws IOException { reader = null; } + @Before + public void testSetup() { + searchContext = mock(SearchContext.class); + when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); + } + public void testAddingCancellationActions() throws IOException { ContextIndexSearcher searcher = new ContextIndexSearcher( reader, @@ -114,7 +122,7 @@ public void testAddingCancellationActions() throws IOException { IndexSearcher.getDefaultQueryCachingPolicy(), true, null, - mock(SearchContext.class) + searchContext ); NullPointerException npe = expectThrows(NullPointerException.class, () -> searcher.addQueryCancellation(null)); assertEquals("cancellation runnable should not be null", npe.getMessage()); @@ -128,7 +136,6 @@ public void testAddingCancellationActions() throws IOException { public void testCancellableCollector() throws IOException { TotalHitCountCollector collector1 = new TotalHitCountCollector(); Runnable cancellation = () -> { throw new TaskCancelledException("cancelled"); }; - SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); ContextIndexSearcher searcher = new ContextIndexSearcher( @@ -167,7 +174,7 @@ public void testExitableDirectoryReader() throws IOException { IndexSearcher.getDefaultQueryCachingPolicy(), true, null, - mock(SearchContext.class) + searchContext ); searcher.addQueryCancellation(cancellation); CompiledAutomaton automaton = new CompiledAutomaton(new RegExp("a.*").toAutomaton()); diff --git a/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java b/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java index a2c4288e081b3..f3907355ac6ec 100644 --- a/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java +++ b/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java @@ -259,6 +259,7 @@ public void onRemoval(ShardId shardId, Accountable accountable) { SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); + when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); ContextIndexSearcher searcher = new ContextIndexSearcher( filteredReader, IndexSearcher.getDefaultSimilarity(), diff --git a/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java b/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java index f0f692e15f066..528d65bcc5ef2 100644 --- a/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java +++ b/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java @@ -120,6 +120,7 @@ public void setUp() throws Exception { SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); + when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); searcher = new ContextIndexSearcher( reader, IndexSearcher.getDefaultSimilarity(), diff --git a/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java b/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java index cf4d7798056c4..61b78905334ec 100644 --- a/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java +++ b/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java @@ -1207,6 +1207,7 @@ private static ContextIndexSearcher newContextSearcher(IndexReader reader, Execu SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); + when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); return new ContextIndexSearcher( reader, IndexSearcher.getDefaultSimilarity(), @@ -1223,6 +1224,7 @@ private static ContextIndexSearcher newEarlyTerminationContextSearcher(IndexRead SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); + when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); return new ContextIndexSearcher( reader, IndexSearcher.getDefaultSimilarity(), diff --git a/server/src/test/java/org/opensearch/search/query/QueryProfilePhaseTests.java b/server/src/test/java/org/opensearch/search/query/QueryProfilePhaseTests.java index 9c27783263c5a..6d30d7993c850 100644 --- a/server/src/test/java/org/opensearch/search/query/QueryProfilePhaseTests.java +++ b/server/src/test/java/org/opensearch/search/query/QueryProfilePhaseTests.java @@ -1427,6 +1427,7 @@ private static ContextIndexSearcher newContextSearcher(IndexReader reader, Execu SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); + when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); return new ContextIndexSearcher( reader, IndexSearcher.getDefaultSimilarity(), @@ -1443,6 +1444,7 @@ private static ContextIndexSearcher newEarlyTerminationContextSearcher(IndexRead SearchContext searchContext = mock(SearchContext.class); IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); + when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); return new ContextIndexSearcher( reader, IndexSearcher.getDefaultSimilarity(), diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index 3cddf4659fea9..60d337599771c 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -349,6 +349,7 @@ public boolean shouldCache(Query query) { when(searchContext.indexShard()).thenReturn(indexShard); when(searchContext.aggregations()).thenReturn(new SearchContextAggregations(AggregatorFactories.EMPTY, bucketConsumer)); when(searchContext.query()).thenReturn(query); + when(searchContext.bucketCollectorProcessor()).thenReturn(new BucketCollectorProcessor()); /* * Always use the circuit breaking big arrays instance so that the CircuitBreakerService * we're passed gets a chance to break. diff --git a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java index 60425077b14a0..694f88d944f71 100644 --- a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java @@ -52,6 +52,7 @@ import org.opensearch.index.similarity.SimilarityService; import org.opensearch.search.SearchExtBuilder; import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.aggregations.BucketCollectorProcessor; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.collapse.CollapseContext; @@ -116,6 +117,7 @@ public class TestSearchContext extends SearchContext { private Profilers profilers; private CollapseContext collapse; protected boolean concurrentSegmentSearchEnabled; + private BucketCollectorProcessor bucketCollectorProcessor = NO_OP_BUCKET_COLLECTOR_PROCESSOR; /** * Sets the concurrent segment search enabled field @@ -661,6 +663,16 @@ public InternalAggregation.ReduceContext partial() { return InternalAggregationTestCase.emptyReduceContextBuilder().forPartialReduction(); } + @Override + public void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor) { + this.bucketCollectorProcessor = bucketCollectorProcessor; + } + + @Override + public BucketCollectorProcessor bucketCollectorProcessor() { + return bucketCollectorProcessor; + } + /** * Clean the query results by consuming all of it */