Skip to content

Commit

Permalink
Shortcut query phase using the results of other shards (#51852) (#53659)
Browse files Browse the repository at this point in the history
This commit, built on top of #51708, allows to modify shard search requests based on informations collected on other shards. It is intended to speed up sorted queries on time-based indices. For queries that are only interested in the top documents.

This change will rewrite the shard queries to match none if the bottom sort value computed in prior shards is better than all values in the shard.
For queries that mix top documents and aggregations this change will reset the size of the top documents to 0 instead of rewriting to match none.
This means that we don't need to keep a search context open for this shard since we know in advance that it doesn't contain any competitive hit.
  • Loading branch information
jimczi committed Mar 18, 2020
1 parent ae19802 commit 8e17322
Show file tree
Hide file tree
Showing 19 changed files with 1,116 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ static SearchRequest[] sliceIntoSubRequests(SearchRequest request, String field,
if (request.source().slice() != null) {
throw new IllegalStateException("Can't slice a request that already has a slice configuration");
}
slicedSource = request.source().copyWithNewSlice(sliceBuilder);
slicedSource = request.source().shallowCopy().slice(sliceBuilder);
}
SearchRequest searchRequest = new SearchRequest(request);
searchRequest.source(slicedSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ setup:
created_at:
type: date
format: "strict_date"

- do:
indices.create:
index: index_3
Expand Down Expand Up @@ -154,31 +155,3 @@ setup:
- match: { hits.total: 2 }
- length: { aggregations.idx_terms.buckets: 2 }

# check that empty responses are correctly handled when rewriting to match_no_docs
- do:
search:
rest_total_hits_as_int: true
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 2 }
- length: { aggregations.idx_terms.buckets: 2 }

- do:
search:
rest_total_hits_as_int: true
# ensure that one shard can return empty response
max_concurrent_shard_requests: 2
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2019-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 0 }
- length: { aggregations.idx_terms.buckets: 0 }
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
setup:
- do:
indices.create:
index: index_1
body:
settings:
number_of_shards: 1
mappings:
properties:
created_at:
type: date
format: "strict_date"
- do:
indices.create:
index: index_2
body:
settings:
number_of_shards: 1
mappings:
properties:
created_at:
type: date
format: "strict_date"

- do:
indices.create:
index: index_3
body:
settings:
number_of_shards: 1
mappings:
properties:
created_at:
type: date
format: "strict_date"


---
"test distributed sort can rewrite query to match no docs":

- skip:
version: " - 7.6.99"
reason: "distributed sort optimization was added in 7.7.0"
- do:
index:
index: index_1
id: 1
body: { "created_at": "2016-01-01"}
- do:
index:
index: index_2
id: 2
body: { "created_at": "2017-01-01" }

- do:
index:
index: index_3
id: 3
body: { "created_at": "2018-01-01" }
- do:
indices.refresh: {}

# check that empty responses are correctly handled when rewriting to match_no_docs
- do:
search:
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total.value: 2 }
- length: { aggregations.idx_terms.buckets: 2 }

- do:
search:
# ensure that one shard can return empty response
max_concurrent_shard_requests: 2
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2019-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total.value: 0 }
- length: { aggregations.idx_terms.buckets: 0 }

# check field sort is correct when skipping query phase
- do:
search:
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
pre_filter_shard_size: 1
body:
"size": 1
"track_total_hits": 1
"sort": [{ "created_at": { "order": "desc" } }]

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped: 0 }
- match: { _shards.failed: 0 }
- match: { hits.total.value: 1 }
- match: { hits.total.relation: "gte" }
- length: { hits.hits: 1 }
- match: { hits.hits.0._id: "3" }

# same with aggs
- do:
search:
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
pre_filter_shard_size: 1
body:
"size": 1
"track_total_hits": 1
"sort": [{ "created_at": { "order": "desc" } }]
"aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } }

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped: 0 }
- match: { _shards.failed: 0 }
- match: { hits.total.value: 1 }
- match: { hits.total.relation: "gte" }
- length: { hits.hits: 1 }
- match: {hits.hits.0._id: "3" }
- length: { aggregations.idx_terms.buckets: 3 }
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
**/
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection;
private final SearchTask task;
private final SearchPhaseResults<Result> results;
final SearchPhaseResults<Result> results;
private final ClusterState clusterState;
private final Map<String, AliasFilter> aliasFilter;
private final Map<String, Float> concreteIndexBoosts;
Expand Down Expand Up @@ -467,7 +467,7 @@ public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarg
* @param result the result returned form the shard
* @param shardIt the shard iterator
*/
private void onShardResult(Result result, SearchShardIterator shardIt) {
protected void onShardResult(Result result, SearchShardIterator shardIt) {
assert result.getShardIndex() != -1 : "shard index is not set";
assert result.getSearchShardTarget() != null : "search shard target must not be null";
successfulOps.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.search;

import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopFieldDocs;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchSortValuesAndFormats;

/**
* Utility class to keep track of the bottom doc's sort values in a distributed search.
*/
class BottomSortValuesCollector {
private final int topNSize;
private final SortField[] sortFields;
private final FieldComparator[] comparators;
private final int[] reverseMuls;

private volatile long totalHits;
private volatile SearchSortValuesAndFormats bottomSortValues;

BottomSortValuesCollector(int topNSize, SortField[] sortFields) {
this.topNSize = topNSize;
this.comparators = new FieldComparator[sortFields.length];
this.reverseMuls = new int[sortFields.length];
this.sortFields = sortFields;
for (int i = 0; i < sortFields.length; i++) {
comparators[i] = sortFields[i].getComparator(1, i);
reverseMuls[i] = sortFields[i].getReverse() ? -1 : 1;
}
}

long getTotalHits() {
return totalHits;
}

/**
* @return The best bottom sort values consumed so far.
*/
SearchSortValuesAndFormats getBottomSortValues() {
return bottomSortValues;
}

synchronized void consumeTopDocs(TopFieldDocs topDocs, DocValueFormat[] sortValuesFormat) {
totalHits += topDocs.totalHits.value;
if (validateShardSortFields(topDocs.fields) == false) {
return;
}

FieldDoc shardBottomDoc = extractBottom(topDocs);
if (shardBottomDoc == null) {
return;
}
if (bottomSortValues == null
|| compareValues(shardBottomDoc.fields, bottomSortValues.getRawSortValues()) < 0) {
bottomSortValues = new SearchSortValuesAndFormats(shardBottomDoc.fields, sortValuesFormat);
}
}

/**
* @return <code>false</code> if the provided {@link SortField} array differs
* from the initial {@link BottomSortValuesCollector#sortFields}.
*/
private boolean validateShardSortFields(SortField[] shardSortFields) {
for (int i = 0; i < shardSortFields.length; i++) {
if (shardSortFields[i].equals(sortFields[i]) == false) {
// ignore shards response that would make the sort incompatible
// (e.g.: mixing keyword/numeric or long/double).
// TODO: we should fail the entire request because the topdocs
// merge will likely fail later but this is not possible with
// the current async logic that only allows shard failures here.
return false;
}
}
return true;
}

private FieldDoc extractBottom(TopFieldDocs topDocs) {
return topNSize > 0 && topDocs.scoreDocs.length == topNSize ?
(FieldDoc) topDocs.scoreDocs[topNSize-1] : null;
}

private int compareValues(Object[] v1, Object[] v2) {
for (int i = 0; i < v1.length; i++) {
int cmp = reverseMuls[i] * comparators[i].compareValues(v1[i], v2[i]);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,8 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
for (SearchPhaseResult entry : queryResults) {
QuerySearchResult result = entry.queryResult();
from = result.from();
size = result.size();
// sorted queries can set the size to 0 if they have enough competitive hits.
size = Math.max(result.size(), size);
if (hasSuggest) {
assert result.suggest() != null;
for (Suggestion<? extends Suggestion.Entry<? extends Suggestion.Entry.Option>> suggestion : result.suggest()) {
Expand Down Expand Up @@ -725,15 +726,6 @@ int getNumBuffered() {
int getNumReducePhases() { return numReducePhases; }
}

private int resolveTrackTotalHits(SearchRequest request) {
if (request.scroll() != null) {
// no matter what the value of track_total_hits is
return SearchContext.TRACK_TOTAL_HITS_ACCURATE;
}
return request.source() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : request.source().trackTotalHitsUpTo() == null ?
SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : request.source().trackTotalHitsUpTo();
}

/**
* Returns a new ArraySearchPhaseResults instance. This might return an instance that reduces search responses incrementally.
*/
Expand All @@ -744,7 +736,7 @@ ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchProgressL
boolean isScrollRequest = request.scroll() != null;
final boolean hasAggs = source != null && source.aggregations() != null;
final boolean hasTopDocs = source == null || source.size() != 0;
final int trackTotalHitsUpTo = resolveTrackTotalHits(request);
final int trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder = requestToAggReduceContextBuilder.apply(request);
if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
// no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
Expand Down
Loading

0 comments on commit 8e17322

Please sign in to comment.