diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 62d277560f77c..1bab293c99b43 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -339,54 +339,59 @@ private void testMixedVersionsShardsSearch(VersionInformation oldVersion, Versio SearchTransportService searchTransportService = new SearchTransportService(null, null, null); SearchPhaseController controller = new SearchPhaseController((t, r) -> InternalAggregationTestCase.emptyReduceContextBuilder()); SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); - QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer( - searchRequest, - EsExecutors.DIRECT_EXECUTOR_SERVICE, - new NoopCircuitBreaker(CircuitBreaker.REQUEST), - controller, - task::isCancelled, - task.getProgressListener(), - shardsIter.size(), - exc -> {} - ); - final List responses = new ArrayList<>(); - SearchQueryThenFetchAsyncAction newSearchAsyncAction = new SearchQueryThenFetchAsyncAction( - logger, - null, - searchTransportService, - (clusterAlias, node) -> lookup.get(node), - Collections.singletonMap("_na_", AliasFilter.EMPTY), - Collections.emptyMap(), - EsExecutors.DIRECT_EXECUTOR_SERVICE, - resultConsumer, - searchRequest, - new ActionListener<>() { - @Override - public void onFailure(Exception e) { - responses.add(e); - } + try ( + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer( + searchRequest, + EsExecutors.DIRECT_EXECUTOR_SERVICE, + new NoopCircuitBreaker(CircuitBreaker.REQUEST), + controller, + task::isCancelled, + task.getProgressListener(), + shardsIter.size(), + exc -> {} + ) + ) { + final List responses = new ArrayList<>(); + SearchQueryThenFetchAsyncAction newSearchAsyncAction = new SearchQueryThenFetchAsyncAction( + logger, + null, + searchTransportService, + (clusterAlias, node) -> lookup.get(node), + Collections.singletonMap("_na_", AliasFilter.EMPTY), + Collections.emptyMap(), + EsExecutors.DIRECT_EXECUTOR_SERVICE, + resultConsumer, + searchRequest, + new ActionListener<>() { + @Override + public void onFailure(Exception e) { + responses.add(e); + } - public void onResponse(SearchResponse response) { - responses.add(response); - }; - }, - shardsIter, - timeProvider, - new ClusterState.Builder(new ClusterName("test")).build(), - task, - SearchResponse.Clusters.EMPTY, - null - ); + public void onResponse(SearchResponse response) { + responses.add(response); + } - newSearchAsyncAction.start(); - assertThat(responses, hasSize(1)); - assertThat(responses.get(0), instanceOf(SearchPhaseExecutionException.class)); - SearchPhaseExecutionException e = (SearchPhaseExecutionException) responses.get(0); - assertThat(e.getCause(), instanceOf(VersionMismatchException.class)); - assertThat( - e.getCause().getMessage(), - equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]") - ); + ; + }, + shardsIter, + timeProvider, + new ClusterState.Builder(new ClusterName("test")).build(), + task, + SearchResponse.Clusters.EMPTY, + null + ); + + newSearchAsyncAction.start(); + assertThat(responses, hasSize(1)); + assertThat(responses.get(0), instanceOf(SearchPhaseExecutionException.class)); + SearchPhaseExecutionException e = (SearchPhaseExecutionException) responses.get(0); + assertThat(e.getCause(), instanceOf(VersionMismatchException.class)); + assertThat( + e.getCause().getMessage(), + equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]") + ); + } } public void testMinimumVersionSameAsOldVersion() throws Exception { @@ -639,77 +644,84 @@ public void sendExecuteQuery( }; SearchPhaseController controller = new SearchPhaseController((t, r) -> InternalAggregationTestCase.emptyReduceContextBuilder()); SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); - QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer( - searchRequest, - EsExecutors.DIRECT_EXECUTOR_SERVICE, - new NoopCircuitBreaker(CircuitBreaker.REQUEST), - controller, - task::isCancelled, - task.getProgressListener(), - shardsIter.size(), - exc -> {} - ); + CountDownLatch latch = new CountDownLatch(1); - SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction( - logger, - null, - searchTransportService, - (clusterAlias, node) -> lookup.get(node), - Collections.singletonMap("_na_", AliasFilter.EMPTY), - Collections.emptyMap(), - EsExecutors.DIRECT_EXECUTOR_SERVICE, - resultConsumer, - searchRequest, - null, - shardsIter, - timeProvider, - new ClusterState.Builder(new ClusterName("test")).build(), - task, - SearchResponse.Clusters.EMPTY, - null + try ( + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer( + searchRequest, + EsExecutors.DIRECT_EXECUTOR_SERVICE, + new NoopCircuitBreaker(CircuitBreaker.REQUEST), + controller, + task::isCancelled, + task.getProgressListener(), + shardsIter.size(), + exc -> {} + ) ) { - @Override - protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { - return new SearchPhase("test") { - @Override - public void run() { - latch.countDown(); - } - }; - } - }; - ShardRouting routingOldVersionShard = ShardRouting.newUnassigned( - new ShardId(new Index("idx", "_na_"), 2), - true, - RecoverySource.EmptyStoreRecoverySource.INSTANCE, - new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar"), - ShardRouting.Role.DEFAULT - ); - SearchShardIterator shardIt = new SearchShardIterator( - null, - new ShardId(new Index("idx", "_na_"), 2), - singletonList(routingOldVersionShard), - idx - ); - routingOldVersionShard = routingOldVersionShard.initialize(oldVersionNode.getId(), "p2", 0); - routingOldVersionShard.started(); - action.start(); - latch.await(); - assertThat(successfulOps.get(), equalTo(2)); - SearchPhaseController.ReducedQueryPhase phase = action.results.reduce(); - assertThat(phase.numReducePhases(), greaterThanOrEqualTo(1)); - assertThat(phase.totalHits().value, equalTo(2L)); - assertThat(phase.totalHits().relation, equalTo(TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO)); + SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction( + logger, + null, + searchTransportService, + (clusterAlias, node) -> lookup.get(node), + Collections.singletonMap("_na_", AliasFilter.EMPTY), + Collections.emptyMap(), + EsExecutors.DIRECT_EXECUTOR_SERVICE, + resultConsumer, + searchRequest, + null, + shardsIter, + timeProvider, + new ClusterState.Builder(new ClusterName("test")).build(), + task, + SearchResponse.Clusters.EMPTY, + null + ) { + @Override + protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { + return new SearchPhase("test") { + @Override + public void run() { + latch.countDown(); + } + }; + } + }; + ShardRouting routingOldVersionShard = ShardRouting.newUnassigned( + new ShardId(new Index("idx", "_na_"), 2), + true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar"), + ShardRouting.Role.DEFAULT + ); + SearchShardIterator shardIt = new SearchShardIterator( + null, + new ShardId(new Index("idx", "_na_"), 2), + singletonList(routingOldVersionShard), + idx + ); + routingOldVersionShard = routingOldVersionShard.initialize(oldVersionNode.getId(), "p2", 0); + routingOldVersionShard.started(); + action.start(); + latch.await(); + assertThat(successfulOps.get(), equalTo(2)); + SearchPhaseController.ReducedQueryPhase phase = action.results.reduce(); + assertThat(phase.numReducePhases(), greaterThanOrEqualTo(1)); + assertThat(phase.totalHits().value, equalTo(2L)); + assertThat(phase.totalHits().relation, equalTo(TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO)); - SearchShardTarget searchShardTarget = new SearchShardTarget("node3", shardIt.shardId(), null); - SearchActionListener listener = new SearchActionListener(searchShardTarget, 0) { - @Override - public void onFailure(Exception e) {} + SearchShardTarget searchShardTarget = new SearchShardTarget("node3", shardIt.shardId(), null); + SearchActionListener listener = new SearchActionListener(searchShardTarget, 0) { + @Override + public void onFailure(Exception e) {} - @Override - protected void innerOnResponse(SearchPhaseResult response) {} - }; - Exception e = expectThrows(VersionMismatchException.class, () -> action.executePhaseOnShard(shardIt, searchShardTarget, listener)); - assertThat(e.getMessage(), equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]")); + @Override + protected void innerOnResponse(SearchPhaseResult response) {} + }; + Exception e = expectThrows( + VersionMismatchException.class, + () -> action.executePhaseOnShard(shardIt, searchShardTarget, listener) + ); + assertThat(e.getMessage(), equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]")); + } } }