From 28fa23b8cb69dc2e78d6aaf491d08d47b1164e43 Mon Sep 17 00:00:00 2001 From: jimczi Date: Thu, 30 Jan 2020 22:52:08 +0100 Subject: [PATCH 01/10] Always rewrite search shard request outside of the search thread pool This change ensures that the rewrite of the shard request is executed in the network thread or in the refresh listener when waiting for an active shard. This allows queries that rewrite to match_no_docs to bypass the search thread pool entirely even if the can_match phase was skipped (pre_filter_shard_size > number of shards). Coordinating nodes don't have the ability to create empty responses so this change also ensures that at least one shard creates a full empty response while the other can return null ones. This is needed since creating true empty responses on shards require to create concrete aggregators which would be too costly to build on a network thread. We should move this functionality to aggregation builders in a follow up but that would be a much bigger change. This change is also important for #49601 since we want to add the ability to use the result of other shards to rewrite the request of subsequent ones. For instance if the first M shards have their top N computed, the top worst document in the global queue can be pass to subsequent shards that can then rewrite to match_no_docs if they can guarantee that they don't have any document better than the provided one. --- .../search/140_pre_filter_search_shards.yml | 29 ++ .../search/AbstractSearchAsyncAction.java | 14 +- .../action/search/SearchPhaseController.java | 66 +++-- .../elasticsearch/index/shard/IndexShard.java | 7 + .../elasticsearch/search/SearchService.java | 199 ++++++++++--- .../search/internal/ShardSearchRequest.java | 26 +- .../search/query/QuerySearchResult.java | 55 +++- .../search/SearchPhaseControllerTests.java | 61 +++- .../indices/IndicesRequestCacheIT.java | 13 +- .../search/SearchServiceTests.java | 268 +++++++++++++----- .../index/engine/FrozenIndexTests.java | 38 ++- 11 files changed, 612 insertions(+), 164 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml index e9fb959406e0e..636f3d88e10cf 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml @@ -153,3 +153,32 @@ setup: - match: { _shards.failed: 0 } - match: { hits.total: 2 } - length: { aggregations.idx_terms.buckets: 2 } + + # check that empty responses are correctly handled when rewriting to match_no_docs + - do: + search: + rest_total_hits_as_int: true + # ensure that one shard can return empty response + max_concurrent_shard_requests: 1 + body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } } + + - match: { _shards.total: 3 } + - match: { _shards.successful: 3 } + - match: { _shards.skipped : 0 } + - match: { _shards.failed: 0 } + - match: { hits.total: 2 } + - length: { aggregations.idx_terms.buckets: 2 } + + - do: + search: + rest_total_hits_as_int: true + # ensure that one shard can return empty response + max_concurrent_shard_requests: 2 + body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2019-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } } + + - match: { _shards.total: 3 } + - match: { _shards.successful: 3 } + - match: { _shards.skipped : 0 } + - match: { _shards.failed: 0 } + - match: { hits.total: 0 } + - length: { aggregations.idx_terms.buckets: 0 } diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index ca68bb4008146..bc7b347531ad0 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -51,6 +51,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -75,13 +76,14 @@ abstract class AbstractSearchAsyncAction exten **/ private final BiFunction nodeIdToConnection; private final SearchTask task; - private final SearchPhaseResults results; + protected final SearchPhaseResults results; private final long clusterStateVersion; private final Map aliasFilter; private final Map concreteIndexBoosts; private final Map> indexRoutings; private final SetOnce> shardFailures = new SetOnce<>(); private final Object shardFailuresMutex = new Object(); + private final AtomicBoolean hasShardResponse = new AtomicBoolean(false); private final AtomicInteger successfulOps = new AtomicInteger(); private final AtomicInteger skippedOps = new AtomicInteger(); private final SearchTimeProvider timeProvider; @@ -462,9 +464,10 @@ public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarg * @param result the result returned form the shard * @param shardIt the shard iterator */ - private void onShardResult(Result result, SearchShardIterator shardIt) { + protected void onShardResult(Result result, SearchShardIterator shardIt) { assert result.getShardIndex() != -1 : "shard index is not set"; assert result.getSearchShardTarget() != null : "search shard target must not be null"; + hasShardResponse.set(true); successfulOps.incrementAndGet(); results.consumeResult(result); if (logger.isTraceEnabled()) { @@ -602,8 +605,13 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar String indexName = shardIt.shardId().getIndex().getName(); final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet()) .toArray(new String[0]); - return new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(), + ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings); + // if we already received a search result we can inform the shard that it + // can return a null response if the request rewrites to match none rather + // than creating an empty response in the search thread pool. + shardRequest.setMatchNoDocsReturnNullResponse(hasShardResponse.get()); + return shardRequest; } /** diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 27b5c9cf3b2a8..06bb29a51ce6c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -65,6 +65,7 @@ import java.util.Map; import java.util.function.Function; import java.util.function.IntFunction; +import java.util.stream.Collectors; public final class SearchPhaseController { @@ -427,6 +428,15 @@ private ReducedQueryPhase reducedQueryPhase(Collection res.queryResult().isNull() == false) + .collect(Collectors.toList()); + String errorMsg = "must have at least one non-empty search result, got 0 out of " + total; + assert queryResults.isEmpty() == false : errorMsg; + if (queryResults.isEmpty()) { + throw new IllegalStateException(errorMsg); + } final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult(); final boolean hasSuggest = firstResult.suggest() != null; final boolean hasProfileResults = firstResult.hasProfileResults(); @@ -622,36 +632,38 @@ public void consumeResult(SearchPhaseResult result) { } private synchronized void consumeInternal(QuerySearchResult querySearchResult) { - if (index == bufferSize) { + if (querySearchResult.isNull() == false) { + if (index == bufferSize) { + if (hasAggs) { + ReduceContext reduceContext = controller.reduceContextFunction.apply(false); + InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext); + Arrays.fill(aggsBuffer, null); + aggsBuffer[0] = reducedAggs; + } + if (hasTopDocs) { + TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer), + // we have to merge here in the same way we collect on a shard + querySearchResult.from() + querySearchResult.size(), 0); + Arrays.fill(topDocsBuffer, null); + topDocsBuffer[0] = reducedTopDocs; + } + numReducePhases++; + index = 1; + if (hasAggs) { + progressListener.notifyPartialReduce(progressListener.searchShards(processedShards), + topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases); + } + } + final int i = index++; if (hasAggs) { - ReduceContext reduceContext = controller.reduceContextFunction.apply(false); - InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext); - Arrays.fill(aggsBuffer, null); - aggsBuffer[0] = reducedAggs; + aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs(); } if (hasTopDocs) { - TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer), - // we have to merge here in the same way we collect on a shard - querySearchResult.from() + querySearchResult.size(), 0); - Arrays.fill(topDocsBuffer, null); - topDocsBuffer[0] = reducedTopDocs; + final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null + topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly()); + setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex()); + topDocsBuffer[i] = topDocs.topDocs; } - numReducePhases++; - index = 1; - if (hasAggs) { - progressListener.notifyPartialReduce(progressListener.searchShards(processedShards), - topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases); - } - } - final int i = index++; - if (hasAggs) { - aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs(); - } - if (hasTopDocs) { - final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null - topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly()); - setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex()); - topDocsBuffer[i] = topDocs.topDocs; } processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget(); } @@ -731,7 +743,7 @@ ReducedQueryPhase reduce() { static final class TopDocsStats { final int trackTotalHitsUpTo; - private long totalHits; + long totalHits; private TotalHits.Relation totalHitsRelation; long fetchHits; private float maxScore = Float.NEGATIVE_INFINITY; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 759feb1d7c68e..7ab04ab6d48b3 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1210,6 +1210,13 @@ private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scop markSearcherAccessed(); final Engine engine = getEngine(); final Engine.Searcher searcher = engine.acquireSearcher(source, scope); + return wrapSearcher(searcher); + } + + /** + * Wraps the provided searcher acquired with {@link #acquireSearcherNoWrap(String)}. + */ + public Engine.Searcher wrapSearcher(Engine.Searcher searcher) { assert ElasticsearchDirectoryReader.unwrap(searcher.getDirectoryReader()) != null : "DirectoryReader must be an instance or ElasticsearchDirectoryReader"; boolean success = false; diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 84c4903dfd838..ffca1b96ab630 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -305,11 +305,31 @@ protected void doClose() { } public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, ActionListener listener) { - rewriteShardRequest(request, ActionListener.map(listener, r -> executeDfsPhase(r, task))); + rewriteShardRequest(request, ActionListener.wrap( + // fork the execution in the search thread pool and wraps the searcher + // to execute the query + context -> { + try { + context.wrapSearcher().execute(() -> { + final SearchPhaseResult result; + try { + result = executeDfsPhase(context, task); + } catch (Exception exc) { + listener.onFailure(exc); + return; + } + listener.onResponse(result); + }); + } catch (Exception exc) { + // if the execution is rejected we need to close the searcher + IOUtils.closeWhileHandlingException(context.searcher); + listener.onFailure(exc); + } + }, listener::onFailure)); } - private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchShardTask task) throws IOException { - final SearchContext context = createAndPutContext(request); + private DfsSearchResult executeDfsPhase(SearchRewriteContext rewriteContext, SearchShardTask task) throws IOException { + final SearchContext context = createAndPutContext(rewriteContext); context.incRef(); try { context.setTask(task); @@ -340,15 +360,59 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea } public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener listener) { - rewriteShardRequest(request, ActionListener.map(listener, r -> executeQueryPhase(r, task))); + assert request.isMatchNoDocsReturnNullResponse() == false || request.numberOfShards() > 1 + : "empty responses require more than one shard"; + rewriteShardRequest(request, ActionListener.wrap( + context -> { + try { + ShardSearchRequest rewritten = context.request; + if (rewritten.isMatchNoDocsReturnNullResponse() + && canRewriteToMatchNone(rewritten.source()) + && rewritten.source().query() instanceof MatchNoneQueryBuilder) { + onMatchNoDocs(context, listener); + } else { + // fork the execution in the search thread pool and wraps the searcher + // to execute the query + context.wrapSearcher().execute(() -> { + final SearchPhaseResult result; + try { + result = executeQueryPhase(context, task); + } catch (Exception exc) { + listener.onFailure(exc); + return; + } + listener.onResponse(result); + }); + } + } catch (Exception exc) { + // if the execution is rejected we need to close the searcher + IOUtils.closeWhileHandlingException(context.searcher); + listener.onFailure(exc); + } + }, listener::onFailure)); + } + + private void onMatchNoDocs(SearchRewriteContext rewriteContext, ActionListener listener) { + // creates a lightweight search context that we use to inform context listeners + // before closing + SearchContext searchContext = createSearchContext(rewriteContext, defaultSearchTimeout); + try (searchContext) { + onNewContext(searchContext); + onFreeContext(searchContext); + } catch (Exception exc) { + listener.onFailure(exc); + return; + } + listener.onResponse(QuerySearchResult.nullInstance()); } private void runAsync(long id, Supplier executable, ActionListener listener) { getExecutor(id).execute(ActionRunnable.supply(listener, executable::get)); } - private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task) throws Exception { - final SearchContext context = createAndPutContext(request); + private SearchPhaseResult executeQueryPhase(SearchRewriteContext rewriteContext, SearchShardTask task) throws Exception { + final SearchContext context = createAndPutContext(rewriteContext); + final ShardSearchRequest request = rewriteContext.request; context.incRef(); try { context.setTask(task); @@ -539,15 +603,8 @@ private SearchContext findContext(long id, TransportRequest request) throws Sear } } - final SearchContext createAndPutContext(ShardSearchRequest request) throws IOException { - if (request.scroll() != null && openScrollContexts.get() >= maxOpenScrollContext) { - throw new ElasticsearchException( - "Trying to create too many scroll contexts. Must be less than or equal to: [" + - maxOpenScrollContext + "]. " + "This limit can be set by changing the [" - + MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting."); - } - - SearchContext context = createContext(request); + final SearchContext createAndPutContext(SearchRewriteContext rewriteContext) { + SearchContext context = createContext(rewriteContext); onNewContext(context); boolean success = false; try { @@ -581,9 +638,16 @@ private void onNewContext(SearchContext context) { } } - final SearchContext createContext(ShardSearchRequest request) throws IOException { - final DefaultSearchContext context = createSearchContext(request, defaultSearchTimeout); + final SearchContext createContext(SearchRewriteContext rewriteContext) { + final DefaultSearchContext context = createSearchContext(rewriteContext, defaultSearchTimeout); try { + if (rewriteContext.request != null && openScrollContexts.get() >= maxOpenScrollContext) { + throw new ElasticsearchException( + "Trying to create too many scroll contexts. Must be less than or equal to: [" + + maxOpenScrollContext + "]. " + "This limit can be set by changing the [" + + MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting."); + } + final ShardSearchRequest request = rewriteContext.request; if (request.scroll() != null) { context.scrollContext(new ScrollContext()); context.scrollContext().scroll = request.scroll(); @@ -619,41 +683,32 @@ final SearchContext createContext(ShardSearchRequest request) throws IOException } public DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException { - return createSearchContext(request, timeout, true, "search"); + IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); + SearchRewriteContext rewriteContext = acquireSearcherAndRewrite(request, shard); + // make sure that we wrap the searcher when executing the query + return createSearchContext(rewriteContext.wrapSearcher(), timeout); } - private DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout, - boolean assertAsyncActions, String source) - throws IOException { + private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteContext, TimeValue timeout) { + final ShardSearchRequest request = rewriteContext.request; + final Engine.Searcher searcher = rewriteContext.searcher; IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); IndexShard indexShard = indexService.getShard(request.shardId().getId()); SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(), indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE); - Engine.Searcher searcher = indexShard.acquireSearcher(source); - boolean success = false; - DefaultSearchContext searchContext = null; try { - searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, + DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, searcher, clusterService, indexService, indexShard, bigArrays, threadPool::relativeTimeInMillis, timeout, fetchPhase); - // we clone the query shard context here just for rewriting otherwise we - // might end up with incorrect state since we are using now() or script services - // during rewrite and normalized / evaluate templates etc. - QueryShardContext context = new QueryShardContext(searchContext.getQueryShardContext()); - Rewriteable.rewrite(request.getRewriteable(), context, assertAsyncActions); - assert searchContext.getQueryShardContext().isCacheable(); success = true; + return searchContext; } finally { if (success == false) { - IOUtils.closeWhileHandlingException(searchContext); - if (searchContext == null) { - // we handle the case where the DefaultSearchContext constructor throws an exception since we would otherwise - // leak a searcher and this can have severe implications (unable to obtain shard lock exceptions). - IOUtils.closeWhileHandlingException(searcher); - } + // we handle the case where the DefaultSearchContext constructor throws an exception since we would otherwise + // leak a searcher and this can have severe implications (unable to obtain shard lock exceptions). + IOUtils.closeWhileHandlingException(rewriteContext.searcher); } } - return searchContext; } private void freeAllContextForIndex(Index index) { @@ -1059,24 +1114,49 @@ public static boolean canRewriteToMatchNone(SearchSourceBuilder source) { return aggregations == null || aggregations.mustVisitAllDocs() == false; } - /* * Rewrites the search request with a light weight rewrite context in order to fetch resources asynchronously - * The action listener is guaranteed to be executed on the search thread-pool + * and then rewrites with a searcher when the shard is active. + * The provided action listener is executed on the same thread or in a listener threadpool. */ - private void rewriteShardRequest(ShardSearchRequest request, ActionListener listener) { + private void rewriteShardRequest(ShardSearchRequest request, ActionListener listener) { IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); - Executor executor = getExecutor(shard); ActionListener actionListener = ActionListener.wrap(r -> // now we need to check if there is a pending refresh and register - shard.awaitShardSearchActive(b -> executor.execute(ActionRunnable.supply(listener, () -> request))), - listener::onFailure); + shard.awaitShardSearchActive(b -> { + try { + SearchRewriteContext rewriteContext = acquireSearcherAndRewrite(request, shard); + listener.onResponse(rewriteContext); + } catch (Exception e) { + listener.onFailure(e); + } + }), listener::onFailure); // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not // adding a lot of overhead Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), actionListener); } + SearchRewriteContext acquireSearcherAndRewrite(ShardSearchRequest request, IndexShard shard) throws IOException { + // acquire the searcher for rewrite with no wrapping in order to avoid costly + // operations. We'll wrap the searcher at a later stage (when executing the query). + Engine.Searcher searcher = shard.acquireSearcherNoWrap("search"); + boolean success = false; + try { + IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); + QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), searcher, + request::nowInMillis, request.getClusterAlias()); + Rewriteable.rewrite(request.getRewriteable(), context, true); + SearchRewriteContext rewrite = new SearchRewriteContext(request, shard, searcher, getExecutor(shard)); + success = true; + return rewrite; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(searcher); + } + } + } + /** * Returns a new {@link QueryRewriteContext} with the given {@code now} provider */ @@ -1093,6 +1173,37 @@ public InternalAggregation.ReduceContext createReduceContext(boolean finalReduce finalReduce ? multiBucketConsumerService.create() : bucketCount -> {}, finalReduce); } + static class SearchRewriteContext { + private final ShardSearchRequest request; + private final IndexShard shard; + private Engine.Searcher searcher; + private final Executor executor; + + private boolean isWrapped; + + private SearchRewriteContext(ShardSearchRequest request, + IndexShard shard, + Engine.Searcher searcher, + Executor executor) { + this.request = request; + this.shard = shard; + this.searcher = searcher; + this.executor = executor; + } + + SearchRewriteContext wrapSearcher() { + assert isWrapped == false : "searcher already wrapped"; + isWrapped = true; + searcher = shard.wrapSearcher(searcher); + return this; + } + + void execute(Runnable runnable) { + assert isWrapped : "searcher is not wrapped"; + executor.execute(runnable); + } + } + public static final class CanMatchResponse extends SearchPhaseResult { private final boolean canMatch; private final MinAndMax minAndMax; diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index 27543a2285d32..47b10f48e0d99 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -47,6 +47,7 @@ import org.elasticsearch.indices.InvalidAliasNameException; import org.elasticsearch.search.Scroll; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportRequest; @@ -75,7 +76,9 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque private final String preference; private final OriginalIndices originalIndices; - //these are the only two mutable fields, as they are subject to rewriting + private boolean matchNoDocsReturnNullResponse; + + //these are the only mutable fields, as they are subject to rewriting private AliasFilter aliasFilter; private SearchSourceBuilder source; @@ -167,6 +170,11 @@ public ShardSearchRequest(StreamInput in) throws IOException { allowPartialSearchResults = in.readBoolean(); indexRoutings = in.readStringArray(); preference = in.readOptionalString(); + if (in.getVersion().onOrAfter(Version.CURRENT)) { + matchNoDocsReturnNullResponse = in.readBoolean(); + } else { + matchNoDocsReturnNullResponse = true; + } originalIndices = OriginalIndices.readOriginalIndices(in); } @@ -201,6 +209,9 @@ protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOExce out.writeStringArray(indexRoutings); out.writeOptionalString(preference); } + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeBoolean(matchNoDocsReturnNullResponse); + } } @Override @@ -275,6 +286,19 @@ public String preference() { return preference; } + /** + * Returns true if the caller can handle null response {@link QuerySearchResult#nullInstance()}. + * Defaults to false since the coordinator node needs at least one shard response to build the global + * response. + */ + public boolean isMatchNoDocsReturnNullResponse() { + return matchNoDocsReturnNullResponse; + } + + public void setMatchNoDocsReturnNullResponse(boolean value) { + this.matchNoDocsReturnNullResponse = value; + } + /** * Returns the cache key for this shard search request, based on its content */ diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 3d49c96d56119..3151dadadde6b 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -63,18 +63,56 @@ public final class QuerySearchResult extends SearchPhaseResult { private long serviceTimeEWMA = -1; private int nodeQueueSize = -1; + private final boolean isNull; + public QuerySearchResult() { + this(false); } public QuerySearchResult(StreamInput in) throws IOException { super(in); - long id = in.readLong(); - readFromWithId(id, in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + isNull = in.readBoolean(); + } else { + isNull = false; + } + if (isNull == false) { + long id = in.readLong(); + readFromWithId(id, in); + } } public QuerySearchResult(long id, SearchShardTarget shardTarget) { this.requestId = id; setSearchShardTarget(shardTarget); + isNull = false; + } + + private QuerySearchResult(boolean isNull) { + this.isNull = isNull; + } + + private static final QuerySearchResult nullInstance = new QuerySearchResult(true); + + /** + * Returns an instance that contains no response. + */ + public static QuerySearchResult nullInstance() { + return nullInstance; + } + + /** + * Returns true if the result doesn't contain any useful information. + * It is used by the search action to avoid creating an empty response on + * shard request that rewrites to match_no_docs. + * + * TODO: Currently we need the concrete aggregators to build empty responses. This means that we cannot + * build an empty response in the coordinating node so we rely on this hack to ensure that at least one shard + * returns a valid empty response. We should move the ability to create empty responses to aggregation builders + * in order to allow building empty responses directly from the coordinating node. + */ + public boolean isNull() { + return isNull; } @Override @@ -173,6 +211,10 @@ public void aggregations(InternalAggregations aggregations) { hasAggs = aggregations != null; } + public InternalAggregations aggregations() { + return aggregations; + } + /** * Returns and nulls out the profiled results for this search, or potentially null if result was empty. * This allows to free up memory once the profiled result is consumed. @@ -300,8 +342,13 @@ public void readFromWithId(long id, StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeLong(requestId); - writeToNoId(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(isNull); + } + if (isNull == false) { + out.writeLong(requestId); + writeToNoId(out); + } } public void writeToNoId(StreamOutput out) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 641d5bf2c59b4..f49d3a69caca0 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -338,16 +338,37 @@ private static SearchRequest randomSearchRequest() { } public void testConsumer() { + consumerTestCase(0); + } + + public void testConsumerWithEmptyResponse() { + consumerTestCase(randomIntBetween(1, 5)); + } + + private void consumerTestCase(int numEmptyResponses) { + int numShards = 3 + numEmptyResponses; int bufferSize = randomIntBetween(2, 3); SearchRequest request = randomSearchRequest(); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.setBatchedReduceSize(bufferSize); - ArraySearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(NOOP, request, 3); - assertEquals(0, reductions.size()); + ArraySearchPhaseResults consumer = + searchPhaseController.newSearchPhaseResults(NOOP, request, 3+numEmptyResponses); + if (numEmptyResponses == 0) { + assertEquals(0, reductions.size()); + } + if (numEmptyResponses > 0) { + QuerySearchResult empty = QuerySearchResult.nullInstance(); + int shardId = 2 + numEmptyResponses; + empty.setShardIndex(2+numEmptyResponses); + empty.setSearchShardTarget(new SearchShardTarget("node", new ShardId("a", "b", shardId), null, OriginalIndices.NONE)); + consumer.consumeResult(empty); + numEmptyResponses --; + } + QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE)); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), - new DocValueFormat[0]); + new DocValueFormat[0]); InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 1.0D, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); @@ -356,7 +377,7 @@ public void testConsumer() { result = new QuerySearchResult(1, new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE)); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), - new DocValueFormat[0]); + new DocValueFormat[0]); aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 3.0D, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); @@ -365,20 +386,38 @@ public void testConsumer() { result = new QuerySearchResult(1, new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE)); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), - new DocValueFormat[0]); + new DocValueFormat[0]); aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 2.0D, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); result.setShardIndex(1); consumer.consumeResult(result); + + while (numEmptyResponses > 0) { + result = QuerySearchResult.nullInstance(); + int shardId = 2 + numEmptyResponses; + result.setShardIndex(shardId); + result.setSearchShardTarget(new SearchShardTarget("node", new ShardId("a", "b", shardId), null, OriginalIndices.NONE)); + consumer.consumeResult(result); + numEmptyResponses--; + + } + final int numTotalReducePhases; - if (bufferSize == 2) { + if (numShards > bufferSize) { assertThat(consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class)); - assertEquals(1, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumReducePhases()); - assertEquals(2, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumBuffered()); - assertEquals(1, reductions.size()); - assertEquals(false, reductions.get(0)); - numTotalReducePhases = 2; + if (bufferSize == 2) { + assertEquals(1, ((SearchPhaseController.QueryPhaseResultConsumer) consumer).getNumReducePhases()); + assertEquals(2, ((SearchPhaseController.QueryPhaseResultConsumer) consumer).getNumBuffered()); + assertEquals(1, reductions.size()); + assertEquals(false, reductions.get(0)); + numTotalReducePhases = 2; + } else { + assertEquals(0, ((SearchPhaseController.QueryPhaseResultConsumer) consumer).getNumReducePhases()); + assertEquals(3, ((SearchPhaseController.QueryPhaseResultConsumer) consumer).getNumBuffered()); + assertEquals(0, reductions.size()); + numTotalReducePhases = 1; + } } else { assertThat(consumer, not(instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class))); assertEquals(0, reductions.size()); diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java b/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java index 3ea41962e4855..fa83cb2662a8e 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.index.cache.request.RequestCacheStats; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket; @@ -123,20 +124,26 @@ public void testQueryRewrite() throws Exception { assertCacheState(client, "index", 0, 0); final SearchResponse r1 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0) - .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-25")).setPreFilterShardSize(Integer.MAX_VALUE).get(); + .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-25")) + .addAggregation(new GlobalAggregationBuilder("global")) + .setPreFilterShardSize(Integer.MAX_VALUE).get(); ElasticsearchAssertions.assertAllSuccessful(r1); assertThat(r1.getHits().getTotalHits().value, equalTo(7L)); assertCacheState(client, "index", 0, 5); final SearchResponse r2 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0) - .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26")) + .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26")) + // to ensure that query is executed even if it rewrites to match_no_docs + .addAggregation(new GlobalAggregationBuilder("global")) .setPreFilterShardSize(Integer.MAX_VALUE).get(); ElasticsearchAssertions.assertAllSuccessful(r2); assertThat(r2.getHits().getTotalHits().value, equalTo(7L)); assertCacheState(client, "index", 3, 7); final SearchResponse r3 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0) - .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-21").lte("2016-03-27")).setPreFilterShardSize(Integer.MAX_VALUE) + .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-21").lte("2016-03-27")) + .addAggregation(new GlobalAggregationBuilder("global")) + .setPreFilterShardSize(Integer.MAX_VALUE) .get(); ElasticsearchAssertions.assertAllSuccessful(r3); assertThat(r3.getHits().getTotalHits().value, equalTo(7L)); diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 5691530dd32fe..75435f40f7dfb 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -66,6 +66,7 @@ import org.elasticsearch.script.MockScriptPlugin; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; +import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; @@ -77,6 +78,7 @@ import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchRequest; +import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.test.ESSingleNodeTestCase; import org.junit.Before; @@ -168,10 +170,12 @@ public void onIndexModule(IndexModule indexModule) { indexModule.addSearchOperationListener(new SearchOperationListener() { @Override public void onNewContext(SearchContext context) { - if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { - assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]")); - } else { - assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); + if (context.query() != null) { + if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]")); + } else { + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); + } } } @@ -357,15 +361,11 @@ public void testTimeout() throws IOException { final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index")); final IndexShard indexShard = indexService.getShard(0); SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); - final SearchContext contextWithDefaultTimeout = service.createContext( - new ShardSearchRequest( - OriginalIndices.NONE, - searchRequest, - indexShard.shardId(), - 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null) - ); + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite( + new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), + indexShard); + final SearchContext contextWithDefaultTimeout = service.createContext(rewriteContext); try { // the search context should inherit the default timeout assertThat(contextWithDefaultTimeout.timeout(), equalTo(TimeValue.timeValueSeconds(5))); @@ -376,15 +376,11 @@ public void testTimeout() throws IOException { final long seconds = randomIntBetween(6, 10); searchRequest.source(new SearchSourceBuilder().timeout(TimeValue.timeValueSeconds(seconds))); - final SearchContext context = service.createContext( - new ShardSearchRequest( - OriginalIndices.NONE, - searchRequest, - indexShard.shardId(), - 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null) - ); + rewriteContext = service.acquireSearcherAndRewrite( + new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), + indexShard); + final SearchContext context = service.createContext(rewriteContext); try { // the search context should inherit the query timeout assertThat(context.timeout(), equalTo(TimeValue.timeValueSeconds(seconds))); @@ -412,19 +408,25 @@ public void testMaxDocvalueFieldsSearch() throws IOException { for (int i = 0; i < indexService.getIndexSettings().getMaxDocvalueFields(); i++) { searchSourceBuilder.docValueField("field" + i); } - try (SearchContext context = service.createContext( - new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null)) - ) { - assertNotNull(context); + + ShardSearchRequest shardRequest = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); + + { + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard); + try (SearchContext context = service.createContext(rewriteContext)) { + assertNotNull(context); + } + } + + { + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard); searchSourceBuilder.docValueField("one_field_too_much"); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, - () -> service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null))); + () -> service.createContext(rewriteContext)); assertEquals( - "Trying to retrieve too many docvalue_fields. Must be less than or equal to: [100] but was [101]. " - + "This limit can be set by changing the [index.max_docvalue_fields_search] index level setting.", - ex.getMessage()); + "Trying to retrieve too many docvalue_fields. Must be less than or equal to: [100] but was [101]. " + + "This limit can be set by changing the [index.max_docvalue_fields_search] index level setting.", ex.getMessage()); } } @@ -447,20 +449,28 @@ public void testMaxScriptFieldsSearch() throws IOException { searchSourceBuilder.scriptField("field" + i, new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); } - try (SearchContext context = service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, - indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null))) { - assertNotNull(context); + + ShardSearchRequest shardRequest = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); + + { + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard); + try (SearchContext context = service.createContext(rewriteContext)) { + assertNotNull(context); + } + } + + { searchSourceBuilder.scriptField("anotherScriptField", - new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); + new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, - () -> service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null))); + () -> service.createContext(rewriteContext)); assertEquals( - "Trying to retrieve too many script_fields. Must be less than or equal to: [" + maxScriptFields + "] but was [" - + (maxScriptFields + 1) - + "]. This limit can be set by changing the [index.max_script_fields] index level setting.", - ex.getMessage()); + "Trying to retrieve too many script_fields. Must be less than or equal to: [" + maxScriptFields + "] but was [" + + (maxScriptFields + 1) + + "]. This limit can be set by changing the [index.max_script_fields] index level setting.", + ex.getMessage()); } } @@ -477,17 +487,19 @@ public void testIgnoreScriptfieldIfSizeZero() throws IOException { searchSourceBuilder.scriptField("field" + 0, new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); searchSourceBuilder.size(0); - try (SearchContext context = service.createContext(new ShardSearchRequest(OriginalIndices.NONE, - searchRequest, indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null))) { - assertEquals(0, context.scriptFields().fields().size()); + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite( + new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), + indexShard); + try (SearchContext context = service.createContext(rewriteContext)) { + assertEquals(0, context.scriptFields().fields().size()); } } /** * test that creating more than the allowed number of scroll contexts throws an exception */ - public void testMaxOpenScrollContexts() throws RuntimeException { + public void testMaxOpenScrollContexts() throws RuntimeException, IOException { createIndex("index"); client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); @@ -513,8 +525,10 @@ public void testMaxOpenScrollContexts() throws RuntimeException { client().prepareSearch("index").setSize(1).setScroll("1m").get(); } + SearchService.SearchRewriteContext rewriteContext = + service.acquireSearcherAndRewrite(new ShardScrollRequestTest(indexShard.shardId()), indexShard); ElasticsearchException ex = expectThrows(ElasticsearchException.class, - () -> service.createAndPutContext(new ShardScrollRequestTest(indexShard.shardId()))); + () -> service.createAndPutContext(rewriteContext)); assertEquals( "Trying to create too many scroll contexts. Must be less than or equal to: [" + SearchService.MAX_OPEN_SCROLL_CONTEXT.get(Settings.EMPTY) + "]. " + @@ -592,7 +606,7 @@ public Scroll scroll() { } } - public void testCanMatch() throws IOException { + public void testCanMatch() throws IOException, InterruptedException { createIndex("index"); final SearchService service = getInstanceFromNode(SearchService.class); final IndicesService indicesService = getInstanceFromNode(IndicesService.class); @@ -625,11 +639,32 @@ public void testCanMatch() throws IOException { new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); assertEquals(numWrapReader, numWrapInvocations.get()); - // make sure that the wrapper is called when the context is actually created - service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, - indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1f, -1, null, null)).close(); - assertEquals(numWrapReader+1, numWrapInvocations.get()); + ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); + + CountDownLatch latch = new CountDownLatch(1); + SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()); + service.executeQueryPhase(request, task, new ActionListener() { + @Override + public void onResponse(SearchPhaseResult searchPhaseResult) { + try { + // make sure that the wrapper is called when the query is actually executed + assertEquals(numWrapReader+1, numWrapInvocations.get()); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception e) { + try { + throw new AssertionError(e); + } finally { + latch.countDown(); + } + } + }); + latch.await(); } public void testCanRewriteToMatchNone() { @@ -744,18 +779,123 @@ public void testCreateSearchContextFailure() throws IOException { final IndexService indexService = createIndex(index); final SearchService service = getInstanceFromNode(SearchService.class); final ShardId shardId = new ShardId(indexService.index(), 0); + IndexShard indexShard = indexService.getShard(0); - NullPointerException e = expectThrows(NullPointerException.class, - () -> service.createContext( - new ShardSearchRequest(shardId, 0, null) { - @Override - public SearchType searchType() { - // induce an artificial NPE - throw new NullPointerException("expected"); - } + SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite( + new ShardSearchRequest(shardId, 0, AliasFilter.EMPTY) { + @Override + public SearchType searchType() { + // induce an artificial NPE + throw new NullPointerException("expected"); } - )); + }, indexShard); + NullPointerException e = expectThrows(NullPointerException.class, + () -> service.createContext(rewriteContext)); assertEquals("expected", e.getMessage()); assertEquals("should have 2 store refs (IndexService + InternalEngine)", 2, indexService.getShard(0).store().refCount()); } + + public void testMatchNoDocsEmptyResponse() throws InterruptedException { + createIndex("index"); + Thread currentThread = Thread.currentThread(); + SearchService service = getInstanceFromNode(SearchService.class); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index")); + IndexShard indexShard = indexService.getShard(0); + SearchRequest searchRequest = new SearchRequest() + .allowPartialSearchResults(false) + .source(new SearchSourceBuilder() + .aggregation(AggregationBuilders.count("count").field("value"))); + ShardSearchRequest shardRequest = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), + 5, AliasFilter.EMPTY, 1.0f, 0, null, null); + SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()); + + { + CountDownLatch latch = new CountDownLatch(1); + shardRequest.source().query(new MatchAllQueryBuilder()); + service.executeQueryPhase(shardRequest, task, new ActionListener<>() { + @Override + public void onResponse(SearchPhaseResult result) { + try { + assertNotSame(Thread.currentThread(), currentThread); + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); + assertThat(result, instanceOf(QuerySearchResult.class)); + assertFalse(result.queryResult().isNull()); + assertNotNull(result.queryResult().topDocs()); + assertNotNull(result.queryResult().aggregations()); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception exc) { + try { + throw new AssertionError(exc); + } finally { + latch.countDown(); + } + } + }); + latch.await(); + } + + { + CountDownLatch latch = new CountDownLatch(1); + shardRequest.source().query(new MatchNoneQueryBuilder()); + service.executeQueryPhase(shardRequest, task, new ActionListener<>() { + @Override + public void onResponse(SearchPhaseResult result) { + try { + assertNotSame(Thread.currentThread(), currentThread); + assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); + assertThat(result, instanceOf(QuerySearchResult.class)); + assertFalse(result.queryResult().isNull()); + assertNotNull(result.queryResult().topDocs()); + assertNotNull(result.queryResult().aggregations()); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception exc) { + try { + throw new AssertionError(exc); + } finally { + latch.countDown(); + } + } + }); + latch.await(); + } + + { + CountDownLatch latch = new CountDownLatch(1); + shardRequest.setMatchNoDocsReturnNullResponse(true); + service.executeQueryPhase(shardRequest, task, new ActionListener<>() { + @Override + public void onResponse(SearchPhaseResult result) { + try { + // make sure we don't use the search threadpool + assertSame(Thread.currentThread(), currentThread); + assertThat(result, instanceOf(QuerySearchResult.class)); + assertTrue(result.queryResult().isNull()); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception e) { + try { + throw new AssertionError(e); + } finally { + latch.countDown(); + } + } + }); + latch.await(); + } + } } diff --git a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java index 3d06431c554b9..f656c47183abf 100644 --- a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java +++ b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.protocol.xpack.frozen.FreezeRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchRequest; @@ -46,6 +47,7 @@ import org.hamcrest.Matchers; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; import java.util.concurrent.CountDownLatch; @@ -122,26 +124,48 @@ public void testSearchAndGetAPIsAreThrottled() throws InterruptedException, IOEx int numRefreshes = 0; for (int i = 0; i < numRequests; i++) { numRefreshes++; - switch (randomIntBetween(0, 3)) { + // make sure that we don't share the frozen reader in + // concurrent requests + CountDownLatch reqLatch = new CountDownLatch(1); + switch (randomFrom(Arrays.asList(0, 1, 2))) { case 0: - client().prepareGet("index", "" + randomIntBetween(0, 9)).execute(ActionListener.wrap(latch::countDown)); + client().prepareGet("index", "" + randomIntBetween(0, 9)) + .execute(ActionListener.wrap(() -> { + latch.countDown(); + reqLatch.countDown(); + })); + reqLatch.await(); break; case 1: client().prepareSearch("index").setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED) .setSearchType(SearchType.QUERY_THEN_FETCH) - .execute(ActionListener.wrap(latch::countDown)); + .execute(ActionListener.wrap(() -> { + latch.countDown(); + reqLatch.countDown(); + })); // in total 4 refreshes 1x query & 1x fetch per shard (we have 2) numRefreshes += 3; + reqLatch.await(); break; case 2: - client().prepareTermVectors("index", "" + randomIntBetween(0, 9)).execute(ActionListener.wrap(latch::countDown)); + client().prepareTermVectors("index", "" + randomIntBetween(0, 9)) + .execute(ActionListener.wrap(() -> { + latch.countDown(); + reqLatch.countDown(); + })); + reqLatch.await(); break; case 3: client().prepareExplain("index", "" + randomIntBetween(0, 9)).setQuery(new MatchAllQueryBuilder()) - .execute(ActionListener.wrap(latch::countDown)); + .execute(ActionListener.wrap(() -> { + latch.countDown(); + reqLatch.countDown(); + })); + reqLatch.await(); break; - default: - assert false; + + default: + assert false; } } latch.await(); From 534b5525ed54f1e4efbe6267bb154b9be578a6f7 Mon Sep 17 00:00:00 2001 From: jimczi Date: Thu, 30 Jan 2020 23:03:28 +0100 Subject: [PATCH 02/10] add serialization test --- .../search/internal/ShardSearchRequestTests.java | 6 +++++- .../elasticsearch/search/query/QuerySearchResultTests.java | 7 +++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java index c913bbe4b9a37..2e2384f0f84fd 100644 --- a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java @@ -75,6 +75,8 @@ public void testSerialization() throws Exception { assertEquals(deserializedRequest.indexBoost(), shardSearchTransportRequest.indexBoost(), 0.0f); assertEquals(deserializedRequest.getClusterAlias(), shardSearchTransportRequest.getClusterAlias()); assertEquals(shardSearchTransportRequest.allowPartialSearchResults(), deserializedRequest.allowPartialSearchResults()); + assertEquals(deserializedRequest.isMatchNoDocsReturnNullResponse(), + shardSearchTransportRequest.isMatchNoDocsReturnNullResponse()); } private ShardSearchRequest createShardSearchRequest() throws IOException { @@ -88,9 +90,11 @@ private ShardSearchRequest createShardSearchRequest() throws IOException { filteringAliases = new AliasFilter(null, Strings.EMPTY_ARRAY); } final String[] routings = generateRandomStringArray(5, 10, false, true); - return new ShardSearchRequest(new OriginalIndices(searchRequest), searchRequest, shardId, + ShardSearchRequest req = new ShardSearchRequest(new OriginalIndices(searchRequest), searchRequest, shardId, randomIntBetween(1, 100), filteringAliases, randomBoolean() ? 1.0f : randomFloat(), Math.abs(randomLong()), randomAlphaOfLengthBetween(3, 10), routings); + req.setMatchNoDocsReturnNullResponse(randomBoolean()); + return req; } public void testFilteringAliases() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java index ef49d8e436e3e..c67850436b393 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java @@ -99,4 +99,11 @@ public void testSerialization() throws Exception { } assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly()); } + + public void testNullResponse() throws Exception { + QuerySearchResult querySearchResult = QuerySearchResult.nullInstance(); + QuerySearchResult deserialized = + copyWriteable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, Version.CURRENT); + assertEquals(querySearchResult.isNull(), deserialized.isNull()); + } } From c02f352fe574de396eef1c89f98de74314a35133 Mon Sep 17 00:00:00 2001 From: jimczi Date: Thu, 30 Jan 2020 23:40:32 +0100 Subject: [PATCH 03/10] iter --- .../java/org/elasticsearch/indices/IndicesRequestCacheIT.java | 2 +- .../java/org/elasticsearch/index/engine/FrozenIndexTests.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java b/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java index fa83cb2662a8e..ca660aa1a2134 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheIT.java @@ -125,6 +125,7 @@ public void testQueryRewrite() throws Exception { final SearchResponse r1 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0) .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-25")) + // to ensure that query is executed even if it rewrites to match_no_docs .addAggregation(new GlobalAggregationBuilder("global")) .setPreFilterShardSize(Integer.MAX_VALUE).get(); ElasticsearchAssertions.assertAllSuccessful(r1); @@ -133,7 +134,6 @@ public void testQueryRewrite() throws Exception { final SearchResponse r2 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0) .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26")) - // to ensure that query is executed even if it rewrites to match_no_docs .addAggregation(new GlobalAggregationBuilder("global")) .setPreFilterShardSize(Integer.MAX_VALUE).get(); ElasticsearchAssertions.assertAllSuccessful(r2); diff --git a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java index f656c47183abf..3a976a2cd14b8 100644 --- a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java +++ b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java @@ -37,7 +37,6 @@ import org.elasticsearch.protocol.xpack.frozen.FreezeRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchService; -import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchRequest; From f5684ecfc7c42588d167d0eec759f9f6f6a1ceb9 Mon Sep 17 00:00:00 2001 From: jimczi Date: Fri, 31 Jan 2020 00:35:04 +0100 Subject: [PATCH 04/10] fix bwc issue --- .../elasticsearch/search/internal/ShardSearchRequest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index 47b10f48e0d99..340e8e5295c4c 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -170,10 +170,10 @@ public ShardSearchRequest(StreamInput in) throws IOException { allowPartialSearchResults = in.readBoolean(); indexRoutings = in.readStringArray(); preference = in.readOptionalString(); - if (in.getVersion().onOrAfter(Version.CURRENT)) { + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { matchNoDocsReturnNullResponse = in.readBoolean(); } else { - matchNoDocsReturnNullResponse = true; + matchNoDocsReturnNullResponse = false; } originalIndices = OriginalIndices.readOriginalIndices(in); } @@ -209,7 +209,7 @@ protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOExce out.writeStringArray(indexRoutings); out.writeOptionalString(preference); } - if (out.getVersion().onOrAfter(Version.CURRENT)) { + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { out.writeBoolean(matchNoDocsReturnNullResponse); } } From 0acf244b16fc4a7e9bbfeb64160b71d1ae1a7535 Mon Sep 17 00:00:00 2001 From: jimczi Date: Mon, 3 Feb 2020 11:17:08 +0100 Subject: [PATCH 05/10] address review --- .../action/search/AbstractSearchAsyncAction.java | 4 ++-- .../org/elasticsearch/search/SearchService.java | 4 ++-- .../search/internal/ShardSearchRequest.java | 16 ++++++++-------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index bc7b347531ad0..bd1a3bdd3123c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -467,9 +467,9 @@ public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarg protected void onShardResult(Result result, SearchShardIterator shardIt) { assert result.getShardIndex() != -1 : "shard index is not set"; assert result.getSearchShardTarget() != null : "search shard target must not be null"; - hasShardResponse.set(true); successfulOps.incrementAndGet(); results.consumeResult(result); + hasShardResponse.set(true); if (logger.isTraceEnabled()) { logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null); } @@ -610,7 +610,7 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar // if we already received a search result we can inform the shard that it // can return a null response if the request rewrites to match none rather // than creating an empty response in the search thread pool. - shardRequest.setMatchNoDocsReturnNullResponse(hasShardResponse.get()); + shardRequest.canReturnNullResponseIfMatchNoDocs(hasShardResponse.get()); return shardRequest; } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index ffca1b96ab630..2d55e86c2fa3a 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -360,13 +360,13 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea } public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener listener) { - assert request.isMatchNoDocsReturnNullResponse() == false || request.numberOfShards() > 1 + assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1 : "empty responses require more than one shard"; rewriteShardRequest(request, ActionListener.wrap( context -> { try { ShardSearchRequest rewritten = context.request; - if (rewritten.isMatchNoDocsReturnNullResponse() + if (rewritten.canReturnNullResponseIfMatchNoDocs() && canRewriteToMatchNone(rewritten.source()) && rewritten.source().query() instanceof MatchNoneQueryBuilder) { onMatchNoDocs(context, listener); diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index 340e8e5295c4c..f46be3755cee5 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -76,7 +76,7 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque private final String preference; private final OriginalIndices originalIndices; - private boolean matchNoDocsReturnNullResponse; + private boolean canReturnNullResponseIfMatchNoDocs; //these are the only mutable fields, as they are subject to rewriting private AliasFilter aliasFilter; @@ -171,9 +171,9 @@ public ShardSearchRequest(StreamInput in) throws IOException { indexRoutings = in.readStringArray(); preference = in.readOptionalString(); if (in.getVersion().onOrAfter(Version.V_8_0_0)) { - matchNoDocsReturnNullResponse = in.readBoolean(); + canReturnNullResponseIfMatchNoDocs = in.readBoolean(); } else { - matchNoDocsReturnNullResponse = false; + canReturnNullResponseIfMatchNoDocs = false; } originalIndices = OriginalIndices.readOriginalIndices(in); } @@ -210,7 +210,7 @@ protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOExce out.writeOptionalString(preference); } if (out.getVersion().onOrAfter(Version.V_8_0_0)) { - out.writeBoolean(matchNoDocsReturnNullResponse); + out.writeBoolean(canReturnNullResponseIfMatchNoDocs); } } @@ -291,12 +291,12 @@ public String preference() { * Defaults to false since the coordinator node needs at least one shard response to build the global * response. */ - public boolean isMatchNoDocsReturnNullResponse() { - return matchNoDocsReturnNullResponse; + public boolean canReturnNullResponseIfMatchNoDocs() { + return canReturnNullResponseIfMatchNoDocs; } - public void setMatchNoDocsReturnNullResponse(boolean value) { - this.matchNoDocsReturnNullResponse = value; + public void canReturnNullResponseIfMatchNoDocs(boolean value) { + this.canReturnNullResponseIfMatchNoDocs = value; } /** From 6016fa43c844868940c1cf4f5b8572ab69093d52 Mon Sep 17 00:00:00 2001 From: jimczi Date: Mon, 3 Feb 2020 11:49:40 +0100 Subject: [PATCH 06/10] adapt test --- .../test/java/org/elasticsearch/search/SearchServiceTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 75435f40f7dfb..c9dc231f0997b 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -872,7 +872,7 @@ public void onFailure(Exception exc) { { CountDownLatch latch = new CountDownLatch(1); - shardRequest.setMatchNoDocsReturnNullResponse(true); + shardRequest.canReturnNullResponseIfMatchNoDocs(true); service.executeQueryPhase(shardRequest, task, new ActionListener<>() { @Override public void onResponse(SearchPhaseResult result) { From a0581270a25c6a11182a5eae9ec2cdeec39e2a07 Mon Sep 17 00:00:00 2001 From: jimczi Date: Mon, 3 Feb 2020 11:52:09 +0100 Subject: [PATCH 07/10] fix test --- .../search/internal/ShardSearchRequestTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java index 2e2384f0f84fd..2d0aa2f591d8d 100644 --- a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java @@ -75,8 +75,8 @@ public void testSerialization() throws Exception { assertEquals(deserializedRequest.indexBoost(), shardSearchTransportRequest.indexBoost(), 0.0f); assertEquals(deserializedRequest.getClusterAlias(), shardSearchTransportRequest.getClusterAlias()); assertEquals(shardSearchTransportRequest.allowPartialSearchResults(), deserializedRequest.allowPartialSearchResults()); - assertEquals(deserializedRequest.isMatchNoDocsReturnNullResponse(), - shardSearchTransportRequest.isMatchNoDocsReturnNullResponse()); + assertEquals(deserializedRequest.canReturnNullResponseIfMatchNoDocs(), + shardSearchTransportRequest.canReturnNullResponseIfMatchNoDocs()); } private ShardSearchRequest createShardSearchRequest() throws IOException { @@ -93,7 +93,7 @@ private ShardSearchRequest createShardSearchRequest() throws IOException { ShardSearchRequest req = new ShardSearchRequest(new OriginalIndices(searchRequest), searchRequest, shardId, randomIntBetween(1, 100), filteringAliases, randomBoolean() ? 1.0f : randomFloat(), Math.abs(randomLong()), randomAlphaOfLengthBetween(3, 10), routings); - req.setMatchNoDocsReturnNullResponse(randomBoolean()); + req.canReturnNullResponseIfMatchNoDocs(randomBoolean()); return req; } From 8534ed25df43ced97d8ff983caaaec115f6d0ea6 Mon Sep 17 00:00:00 2001 From: jimczi Date: Mon, 3 Feb 2020 15:24:37 +0100 Subject: [PATCH 08/10] fix topNSize when size is reset to 0 --- .../action/search/SearchPhaseController.java | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 06bb29a51ce6c..59a5082ffe922 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -39,6 +39,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; @@ -507,6 +508,18 @@ private ReducedQueryPhase reducedQueryPhase(Collection= 2 if there is more than one expected result"); @@ -620,6 +634,7 @@ private QueryPhaseResultConsumer(SearchProgressListener progressListener, Search this.hasAggs = hasAggs; this.bufferSize = bufferSize; this.topDocsStats = new TopDocsStats(trackTotalHitsUpTo); + this.topNSize = topNSize; this.performFinalReduce = performFinalReduce; } @@ -643,7 +658,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) { if (hasTopDocs) { TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer), // we have to merge here in the same way we collect on a shard - querySearchResult.from() + querySearchResult.size(), 0); + topNSize, 0); Arrays.fill(topDocsBuffer, null); topDocsBuffer[0] = reducedTopDocs; } @@ -718,9 +733,10 @@ ArraySearchPhaseResults newSearchPhaseResults(SearchProgressL if (isScrollRequest == false && (hasAggs || hasTopDocs)) { // no incremental reduce if scroll is used - we only hit a single shard or sometimes more... if (request.getBatchedReduceSize() < numShards) { + int topNSize = getTopDocsSize(request); // only use this if there are aggs and if there are more shards than we should reduce at once return new QueryPhaseResultConsumer(listener, this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs, - trackTotalHitsUpTo, request.isFinalReduce()); + trackTotalHitsUpTo, topNSize, request.isFinalReduce()); } } return new ArraySearchPhaseResults(numShards) { From 27cdf19dfe2c98cd9939cb15f1cab7b34a6e5254 Mon Sep 17 00:00:00 2001 From: jimczi Date: Mon, 3 Feb 2020 20:44:59 +0100 Subject: [PATCH 09/10] add more comments --- .../src/main/java/org/elasticsearch/search/SearchService.java | 1 + .../java/org/elasticsearch/index/engine/FrozenIndexTests.java | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 2d55e86c2fa3a..1750fe41e0d07 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1125,6 +1125,7 @@ private void rewriteShardRequest(ShardSearchRequest request, ActionListener { try { + // we can now acquire a searcher and rewrite the request with it SearchRewriteContext rewriteContext = acquireSearcherAndRewrite(request, shard); listener.onResponse(rewriteContext); } catch (Exception e) { diff --git a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java index 3a976a2cd14b8..2bcaf0a926609 100644 --- a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java +++ b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java @@ -123,8 +123,8 @@ public void testSearchAndGetAPIsAreThrottled() throws InterruptedException, IOEx int numRefreshes = 0; for (int i = 0; i < numRequests; i++) { numRefreshes++; - // make sure that we don't share the frozen reader in - // concurrent requests + // make sure that we don't share the frozen reader in concurrent requests since we acquire the + // searcher and rewrite the request outside of the search-throttle thread pool CountDownLatch reqLatch = new CountDownLatch(1); switch (randomFrom(Arrays.asList(0, 1, 2))) { case 0: From 662972cf2de9b2ad36077b886c4930c8d2a08468 Mon Sep 17 00:00:00 2001 From: jimczi Date: Mon, 3 Feb 2020 21:36:16 +0100 Subject: [PATCH 10/10] more review --- .../search/AbstractSearchAsyncAction.java | 4 +-- .../index/engine/FrozenIndexTests.java | 32 ++++--------------- 2 files changed, 8 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index bd1a3bdd3123c..d4d313c0afab1 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -76,7 +76,7 @@ abstract class AbstractSearchAsyncAction exten **/ private final BiFunction nodeIdToConnection; private final SearchTask task; - protected final SearchPhaseResults results; + private final SearchPhaseResults results; private final long clusterStateVersion; private final Map aliasFilter; private final Map concreteIndexBoosts; @@ -464,7 +464,7 @@ public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarg * @param result the result returned form the shard * @param shardIt the shard iterator */ - protected void onShardResult(Result result, SearchShardIterator shardIt) { + private void onShardResult(Result result, SearchShardIterator shardIt) { assert result.getShardIndex() != -1 : "shard index is not set"; assert result.getSearchShardTarget() != null : "search shard target must not be null"; successfulOps.incrementAndGet(); diff --git a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java index 2bcaf0a926609..eb89391fac177 100644 --- a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java +++ b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java @@ -6,7 +6,6 @@ package org.elasticsearch.index.engine; import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; @@ -49,7 +48,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; -import java.util.concurrent.CountDownLatch; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -119,55 +117,37 @@ public void testSearchAndGetAPIsAreThrottled() throws InterruptedException, IOEx } assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest("index")).actionGet()); int numRequests = randomIntBetween(20, 50); - CountDownLatch latch = new CountDownLatch(numRequests); int numRefreshes = 0; for (int i = 0; i < numRequests; i++) { numRefreshes++; // make sure that we don't share the frozen reader in concurrent requests since we acquire the // searcher and rewrite the request outside of the search-throttle thread pool - CountDownLatch reqLatch = new CountDownLatch(1); switch (randomFrom(Arrays.asList(0, 1, 2))) { case 0: client().prepareGet("index", "" + randomIntBetween(0, 9)) - .execute(ActionListener.wrap(() -> { - latch.countDown(); - reqLatch.countDown(); - })); - reqLatch.await(); + .get(); break; case 1: client().prepareSearch("index").setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED) .setSearchType(SearchType.QUERY_THEN_FETCH) - .execute(ActionListener.wrap(() -> { - latch.countDown(); - reqLatch.countDown(); - })); + .get(); // in total 4 refreshes 1x query & 1x fetch per shard (we have 2) numRefreshes += 3; - reqLatch.await(); break; case 2: client().prepareTermVectors("index", "" + randomIntBetween(0, 9)) - .execute(ActionListener.wrap(() -> { - latch.countDown(); - reqLatch.countDown(); - })); - reqLatch.await(); + .get(); break; case 3: - client().prepareExplain("index", "" + randomIntBetween(0, 9)).setQuery(new MatchAllQueryBuilder()) - .execute(ActionListener.wrap(() -> { - latch.countDown(); - reqLatch.countDown(); - })); - reqLatch.await(); + client().prepareExplain("index", "" + randomIntBetween(0, 9)) + .setQuery(new MatchAllQueryBuilder()) + .get(); break; default: assert false; } } - latch.await(); IndicesStatsResponse index = client().admin().indices().prepareStats("index").clear().setRefresh(true).get(); assertEquals(numRefreshes, index.getTotal().refresh.getTotal()); }