diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java index 5c8f3793f633b..cee7e96b4a6f6 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java @@ -73,7 +73,7 @@ protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest requ @Override protected void moveToSecondPhase() throws Exception { // no need to sort, since we know we have no hits back - final InternalSearchResponse internalResponse = searchPhaseController.merge(SearchPhaseController.EMPTY_DOCS, firstResults, (AtomicArray) AtomicArray.empty()); + final InternalSearchResponse internalResponse = searchPhaseController.merge(SearchPhaseController.EMPTY_DOCS, firstResults, (AtomicArray) AtomicArray.empty(), request); String scrollId = null; if (request.scroll() != null) { scrollId = buildScrollId(request.searchType(), firstResults, null); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java index 021f72a1d103c..d2ee973395a1c 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java @@ -180,7 +180,7 @@ void finishHim() { void innerFinishHim() throws Exception { sortedShardList = searchPhaseController.sortDocs(queryFetchResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults, request); String scrollId = null; if (request.scroll() != null) { scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java index 706f3d0905836..bbefa6a709b5b 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java @@ -289,7 +289,7 @@ void finishHim() { } void innerFinishHim() throws Exception { - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults, request); String scrollId = null; if (request.scroll() != null) { scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java index aab83aed1d0ad..1c9318de65c80 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java @@ -86,7 +86,7 @@ protected void moveToSecondPhase() throws Exception { private void innerFinishHim() throws IOException { sortedShardList = searchPhaseController.sortDocs(firstResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults, request); String scrollId = null; if (request.scroll() != null) { scrollId = buildScrollId(request.searchType(), firstResults, null); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index a3ace10d496f8..983fee82994bf 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -190,7 +190,7 @@ void finishHim() { } void innerFinishHim() throws Exception { - InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults); + InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults, request); String scrollId = null; if (request.scroll() != null) { scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java index 3b289790c647b..79cb584bf7f9c 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java @@ -70,7 +70,7 @@ protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest requ @Override protected void moveToSecondPhase() throws Exception { - final InternalSearchResponse internalResponse = searchPhaseController.merge(SearchPhaseController.EMPTY_DOCS, firstResults, (AtomicArray) AtomicArray.empty()); + final InternalSearchResponse internalResponse = searchPhaseController.merge(SearchPhaseController.EMPTY_DOCS, firstResults, (AtomicArray) AtomicArray.empty(), request); String scrollId = null; if (request.scroll() != null) { scrollId = buildScrollId(request.searchType(), firstResults, ImmutableMap.of("total_hits", Long.toString(internalResponse.hits().totalHits()))); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java index 2da79f230f2de..c1a01ac51f731 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java @@ -242,7 +242,7 @@ private void finishHim() { private void innerFinishHim() { ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(queryFetchResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults, null); String scrollId = null; if (request.scroll() != null) { scrollId = request.scrollId(); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java index b398fab16fcad..3cc249421fc2b 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java @@ -273,7 +273,7 @@ private void finishHim() { } private void innerFinishHim() { - InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults); + InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults, null); String scrollId = null; if (request.scroll() != null) { scrollId = request.scrollId(); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java index d2017ab59924f..3275ee37f287e 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java @@ -260,7 +260,7 @@ private void innerFinishHim() throws IOException { docs[counter++] = scoreDoc; } } - final InternalSearchResponse internalResponse = searchPhaseController.merge(docs, queryFetchResults, queryFetchResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(docs, queryFetchResults, queryFetchResults, null); ((InternalSearchHits) internalResponse.hits()).totalHits = Long.parseLong(this.scrollId.getAttributes().get("total_hits")); diff --git a/src/main/java/org/elasticsearch/action/suggest/TransportSuggestAction.java b/src/main/java/org/elasticsearch/action/suggest/TransportSuggestAction.java index 56eb951a3ecf4..9942855427eb3 100644 --- a/src/main/java/org/elasticsearch/action/suggest/TransportSuggestAction.java +++ b/src/main/java/org/elasticsearch/action/suggest/TransportSuggestAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -62,13 +63,16 @@ public class TransportSuggestAction extends TransportBroadcastOperationAction o1, At private final CacheRecycler cacheRecycler; private final boolean optimizeSingleShard; + private final Client client; @Inject - public SearchPhaseController(Settings settings, CacheRecycler cacheRecycler) { + public SearchPhaseController(Settings settings, CacheRecycler cacheRecycler, Client client) { super(settings); this.cacheRecycler = cacheRecycler; this.optimizeSingleShard = componentSettings.getAsBoolean("optimize_single_shard", true); + this.client = client; } public boolean optimizeSingleShard() { @@ -296,7 +300,7 @@ public void fillDocIdsToLoad(AtomicArray docsIdsToLoad, ScoreD } } - public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray queryResultsArr, AtomicArray fetchResultsArr) { + public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray queryResultsArr, AtomicArray fetchResultsArr, SearchRequest request) { List> queryResults = queryResultsArr.asList(); List> fetchResults = fetchResultsArr.asList(); @@ -411,7 +415,14 @@ public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray> group(Map>> reduce(Map> groupedSuggestions) { + public static List>> reduce(Map> groupedSuggestions, ReduceContext context) { List>> reduced = new ArrayList>>(groupedSuggestions.size()); for (java.util.Map.Entry> unmergedResults : groupedSuggestions.entrySet()) { List value = unmergedResults.getValue(); Suggestion reduce = value.get(0).reduce(value); + reduce.filter(context); reduce.trim(); reduced.add(reduce); } @@ -265,7 +269,14 @@ public Suggestion reduce(List> toReduce) { protected Comparator