From 44012c18ecbabdc5e8943277e3fe4988551d9601 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Tue, 20 Aug 2019 15:23:48 +0200 Subject: [PATCH 1/9] Disallow partial results when shard unavailable Searching with `allowPartialSearchResults=false` could still return partial search results during recovery. If the last shard copy fails with a "shard not available" exception, the failure would be ignored and a partial result returned. The one case where this is known to happen is when the last shard copy is recovering when searching, since `IllegalIndexShardStateException` is considered a "shard not available" exception. Relates to #42612 --- .../action/search/InitialSearchPhase.java | 21 +++- .../AbstractSearchAsyncActionTests.java | 28 ++++- .../CanMatchPreFilterSearchPhaseTests.java | 5 + .../search/basic/SearchRedStateIndexIT.java | 108 ++++++++++++++++-- 4 files changed, 144 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index 6b9efe30fb791..bb5e2cb0d7c1e 100644 --- a/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -86,13 +86,21 @@ abstract class InitialSearchPhase extends this.executor = executor; } - private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, + void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, final SearchShardIterator shardIt, Exception e) { // we always add the shard failure for a specific shard instance // we do make sure to clean it on a successful response from a shard SearchShardTarget shardTarget = shardIt.newSearchShardTarget(nodeId); onShardFailure(shardIndex, shardTarget, e); + final ShardRouting nextShard = shardIt.nextOrNull(); + final boolean lastShard = nextShard == null; + if (lastShard && request.allowPartialSearchResults() == Boolean.FALSE) { + onPhaseFailure(this, + "All shard copies failed for " + shardIt.shardId() + ". Consider using `allow_partial_search_results` " + + "setting to bypass this error.", null); + return; + } if (totalOps.incrementAndGet() == expectedTotalOps) { if (logger.isDebugEnabled()) { if (e != null && !TransportActions.isShardNotAvailableException(e)) { @@ -104,8 +112,6 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, } onPhaseDone(); } else { - final ShardRouting nextShard = shardIt.nextOrNull(); - final boolean lastShard = nextShard == null; // trace log this exception logger.trace(() -> new ParameterizedMessage( "{}: Failed to execute [{}] lastShard [{}]", @@ -331,6 +337,15 @@ private void successfulShardExecution(SearchShardIterator shardsIt) { */ abstract void onPhaseDone(); // as a tribute to @kimchy aka. finishHim() + /** + * This method will communicate a fatal phase failure back to the user. In contrast to a shard failure + * will this method immediately fail the search request and return the failure to the issuer of the request + * @param phase the phase that failed + * @param msg an optional message + * @param cause the cause of the phase failure + */ + abstract void onPhaseFailure(SearchPhase phase, String msg, Throwable cause); + /** * Executed once for every failed shard level request. This method is invoked before the next replica is tried for the given * shard target. diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index 16df17bef1ada..db40eb824d3c2 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; @@ -83,12 +84,13 @@ private AbstractSearchAsyncAction createAction(SearchRequest return null; }; - return new AbstractSearchAsyncAction("test", null, null, nodeIdToConnection, + return new AbstractSearchAsyncAction("test", logger, null, nodeIdToConnection, Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), Collections.singletonMap("name", Sets.newHashSet("bar", "baz")), null, request, listener, new GroupShardsIterator<>( - Collections.singletonList( - new SearchShardIterator(null, null, Collections.emptyList(), null) + List.of( + new SearchShardIterator(null, new ShardId("test", "testUUID", 0), Collections.emptyList(), null), + new SearchShardIterator(null, new ShardId("test", "testUUID", 0), Collections.emptyList(), null) ) ), timeProvider, 0, null, results, request.getMaxConcurrentShardRequests(), @@ -239,6 +241,26 @@ public void run() { assertEquals(requestIds, releasedContexts); } + public void testOnShardNotAvailableDisallowPartialFailures() { + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(false); + AtomicReference exception = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set); + InitialSearchPhase.ArraySearchPhaseResults phaseResults = new InitialSearchPhase.ArraySearchPhaseResults<>(10); + AbstractSearchAsyncAction action = createAction(searchRequest, phaseResults, listener, false, new AtomicLong()); + ShardId shardId = new ShardId("test", "testuuid", randomInt(10)); + // IllegalIndexShardStateException is considered a shard not available exception + action.onShardFailure(0, null, null, new SearchShardIterator(null, shardId, Collections.emptyList(), null), + new IllegalIndexShardStateException(null, null, "shard failure")); + assertThat(exception.get(), instanceOf(SearchPhaseExecutionException.class)); + SearchPhaseExecutionException searchPhaseExecutionException = (SearchPhaseExecutionException)exception.get(); + assertEquals("All shard copies failed for " + shardId + + ". Consider using `allow_partial_search_results` setting to bypass this error.", + searchPhaseExecutionException.getMessage()); + assertEquals("test", searchPhaseExecutionException.getPhaseName()); + assertEquals(0, searchPhaseExecutionException.shardFailures().length); + assertEquals(0, searchPhaseExecutionException.getSuppressed().length); + } + private static InitialSearchPhase.ArraySearchPhaseResults phaseResults(Set requestIds, List> nodeLookups, int numFailures) { diff --git a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 44fe3f92c615a..db5356d44a898 100644 --- a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -225,6 +225,11 @@ void onShardFailure(final int shardIndex, final SearchShardTarget shardTarget, f } + @Override + void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) { + + } + @Override void onShardSuccess(final SearchPhaseResult result) { diff --git a/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java b/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java index 676e50dfd5688..1c97e2b8489ef 100644 --- a/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java +++ b/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java @@ -28,12 +28,16 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchService; import org.elasticsearch.test.ESIntegTestCase; import org.junit.After; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.containsString; @@ -44,7 +48,6 @@ @ESIntegTestCase.ClusterScope(minNumDataNodes = 2) public class SearchRedStateIndexIT extends ESIntegTestCase { - public void testAllowPartialsWithRedState() throws Exception { final int numShards = cluster().numDataNodes()+2; buildRedIndex(numShards); @@ -95,6 +98,80 @@ public void testClusterDisallowPartialsWithRedState() throws Exception { assertThat(ex.getDetailedMessage(), containsString("Search rejected due to missing shard")); } + public void testDisallowPartialsWithRedStateRecovering() throws Exception { + int docCount = scaledRandomIntBetween(1000, 10000); + logger.info("Using docCount [{}]", docCount); + buildIndex(cluster().numDataNodes(), 1, docCount); + + AtomicBoolean stop = new AtomicBoolean(); + List searchThreads = new ArrayList<>(); + // this is a little extreme, but necessary to make this test fail reasonably often (half the runs on my machine). + for (int i = 0; i < 100; ++i) { + Thread searchThread = new Thread() { + { + setDaemon(true); + } + + @Override + public void run() { + while (stop.get() == false) { + verify(() -> client().prepareSearch("test").setQuery(new RangeQueryBuilder("field1").gte(0)).setSize(100).setAllowPartialSearchResults(false).get()); + verify(() -> client().prepareSearch("test").setSize(100).setAllowPartialSearchResults(false).get()); + } + } + + void verify(Supplier call) { + try { + SearchResponse response = call.get(); + assertThat(response.getHits().getHits().length, equalTo(100)); + assertThat(response.getHits().getTotalHits().value, equalTo((long) docCount)); + } catch (Exception e) { + // this is OK. + logger.info("Failed with : " + e); + } + } + }; + searchThreads.add(searchThread); + searchThread.start(); + } + try { + Thread restartThread = new Thread() { + { + setDaemon(true); + } + + @Override + public void run() { + try { + for (int i = 0; i < 5; ++i) { + internalCluster().restartRandomDataNode(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + restartThread.start(); + for (int i = 0; i < 5; ++i) { + internalCluster().restartRandomDataNode(); + } + restartThread.join(30000); + assertFalse(restartThread.isAlive()); + } finally { + stop.set(true); + } + searchThreads.forEach(thread -> { + try { + thread.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + // hack to ensure all search contexts are removed, seems we risk leaked search contexts when coordinator dies. + client().admin().indices().prepareDelete("test").get(); + } + private void setClusterDefaultAllowPartialResults(boolean allowPartialResults) { String key = SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS.getKey(); @@ -110,28 +187,35 @@ private void setClusterDefaultAllowPartialResults(boolean allowPartialResults) { } private void buildRedIndex(int numShards) throws Exception { - assertAcked(prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", - numShards).put("index.number_of_replicas", 0))); + buildIndex(numShards, 0, 10); + + stopNodeAndEnsureRed(); + } + + private void buildIndex(int numShards, int numReplicas, int docCount) { + assertAcked(prepareCreate("test").setSettings(Settings.builder() + .put("index.number_of_shards", numShards).put("index.number_of_replicas", numReplicas))); ensureGreen(); - for (int i = 0; i < 10; i++) { - client().prepareIndex("test", "type1", ""+i).setSource("field1", "value1").get(); + for (int i = 0; i < docCount; i++) { + client().prepareIndex("test", "type1", ""+i).setSource("field1", i).get(); } refresh(); - - internalCluster().stopRandomDataNode(); - + } + + private void stopNodeAndEnsureRed() throws Exception { + internalCluster().stopRandomDataNode(); + client().admin().cluster().prepareHealth().setWaitForStatus(ClusterHealthStatus.RED).get(); assertBusy(() -> { ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); List unassigneds = clusterState.getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED); assertThat(unassigneds.size(), greaterThan(0)); - }); - + }); } - + @After - public void cleanup() throws Exception { + public void cleanup() { assertAcked(client().admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder().putNull(SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS.getKey()))); } From f4d63bbfacd756cba143b6da5bea108311976910 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Tue, 20 Aug 2019 16:43:20 +0200 Subject: [PATCH 2/9] Fix checkstyle. --- .../org/elasticsearch/search/basic/SearchRedStateIndexIT.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java b/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java index 1c97e2b8489ef..5d87162a678f0 100644 --- a/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java +++ b/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java @@ -115,7 +115,8 @@ public void testDisallowPartialsWithRedStateRecovering() throws Exception { @Override public void run() { while (stop.get() == false) { - verify(() -> client().prepareSearch("test").setQuery(new RangeQueryBuilder("field1").gte(0)).setSize(100).setAllowPartialSearchResults(false).get()); + verify(() -> client().prepareSearch("test").setQuery(new RangeQueryBuilder("field1").gte(0)) + .setSize(100).setAllowPartialSearchResults(false).get()); verify(() -> client().prepareSearch("test").setSize(100).setAllowPartialSearchResults(false).get()); } } From 27b5de7a4a302c30cac728580a880cd5cae58dc8 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Wed, 21 Aug 2019 10:43:04 +0200 Subject: [PATCH 3/9] Redo check for partial failure Check for partial failure by comparing successful == num-shards to avoid bailing out early in the search phase. Fixed race condition in MockSearchService. --- .../search/AbstractSearchAsyncAction.java | 10 +++++++ .../action/search/InitialSearchPhase.java | 21 +++------------ .../AbstractSearchAsyncActionTests.java | 18 +++++++------ .../CanMatchPreFilterSearchPhaseTests.java | 5 ---- .../search/basic/SearchRedStateIndexIT.java | 27 ++++++++++++------- .../search/MockSearchService.java | 2 +- 6 files changed, 42 insertions(+), 41 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 8475272a5e2cd..4f35e2a956df9 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -156,6 +156,16 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha return; } } + if (allowPartialResults == false && successfulOps.get() != getNumShards()) { + int discrepancy = getNumShards() - successfulOps.get(); + assert discrepancy > 0 : "discrepancy: " + discrepancy; + if (logger.isDebugEnabled()) { + logger.debug("Partial shards failure (unavailable: {}, successful: {}, skipped: {}, num-shards: {}, phase: {})", + discrepancy, successfulOps.get(), skippedOps.get(), getNumShards(), currentPhase.getName()); + } + onPhaseFailure(currentPhase, "Partial shards failure (" + discrepancy + " shards unavailable)", null); + return; + } if (logger.isTraceEnabled()) { final String resultsFrom = results.getSuccessfulResults() .map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(",")); diff --git a/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index bb5e2cb0d7c1e..6b9efe30fb791 100644 --- a/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -86,21 +86,13 @@ abstract class InitialSearchPhase extends this.executor = executor; } - void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, + private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, final SearchShardIterator shardIt, Exception e) { // we always add the shard failure for a specific shard instance // we do make sure to clean it on a successful response from a shard SearchShardTarget shardTarget = shardIt.newSearchShardTarget(nodeId); onShardFailure(shardIndex, shardTarget, e); - final ShardRouting nextShard = shardIt.nextOrNull(); - final boolean lastShard = nextShard == null; - if (lastShard && request.allowPartialSearchResults() == Boolean.FALSE) { - onPhaseFailure(this, - "All shard copies failed for " + shardIt.shardId() + ". Consider using `allow_partial_search_results` " + - "setting to bypass this error.", null); - return; - } if (totalOps.incrementAndGet() == expectedTotalOps) { if (logger.isDebugEnabled()) { if (e != null && !TransportActions.isShardNotAvailableException(e)) { @@ -112,6 +104,8 @@ void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullabl } onPhaseDone(); } else { + final ShardRouting nextShard = shardIt.nextOrNull(); + final boolean lastShard = nextShard == null; // trace log this exception logger.trace(() -> new ParameterizedMessage( "{}: Failed to execute [{}] lastShard [{}]", @@ -337,15 +331,6 @@ private void successfulShardExecution(SearchShardIterator shardsIt) { */ abstract void onPhaseDone(); // as a tribute to @kimchy aka. finishHim() - /** - * This method will communicate a fatal phase failure back to the user. In contrast to a shard failure - * will this method immediately fail the search request and return the failure to the issuer of the request - * @param phase the phase that failed - * @param msg an optional message - * @param cause the cause of the phase failure - */ - abstract void onPhaseFailure(SearchPhase phase, String msg, Throwable cause); - /** * Executed once for every failed shard level request. This method is invoked before the next replica is tried for the given * shard target. diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index db40eb824d3c2..b37c218dce534 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -28,7 +28,6 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.MatchAllQueryBuilder; -import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; @@ -245,16 +244,19 @@ public void testOnShardNotAvailableDisallowPartialFailures() { SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(false); AtomicReference exception = new AtomicReference<>(); ActionListener listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set); - InitialSearchPhase.ArraySearchPhaseResults phaseResults = new InitialSearchPhase.ArraySearchPhaseResults<>(10); + int numShards = randomIntBetween(2, 10); + InitialSearchPhase.ArraySearchPhaseResults phaseResults = + new InitialSearchPhase.ArraySearchPhaseResults<>(numShards); AbstractSearchAsyncAction action = createAction(searchRequest, phaseResults, listener, false, new AtomicLong()); - ShardId shardId = new ShardId("test", "testuuid", randomInt(10)); - // IllegalIndexShardStateException is considered a shard not available exception - action.onShardFailure(0, null, null, new SearchShardIterator(null, shardId, Collections.emptyList(), null), - new IllegalIndexShardStateException(null, null, "shard failure")); + // skip one to avoid the "all shards failed" failure. + SearchShardIterator skipIterator = new SearchShardIterator(null, null, Collections.emptyList(), null); + skipIterator.resetAndSkip(); + action.skipShard(skipIterator); + // expect at least 2 shards, so onPhaseDone should report failure. + action.onPhaseDone(); assertThat(exception.get(), instanceOf(SearchPhaseExecutionException.class)); SearchPhaseExecutionException searchPhaseExecutionException = (SearchPhaseExecutionException)exception.get(); - assertEquals("All shard copies failed for " + shardId - + ". Consider using `allow_partial_search_results` setting to bypass this error.", + assertEquals("Partial shards failure (" + (numShards - 1) + " shards unavailable)", searchPhaseExecutionException.getMessage()); assertEquals("test", searchPhaseExecutionException.getPhaseName()); assertEquals(0, searchPhaseExecutionException.shardFailures().length); diff --git a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index db5356d44a898..44fe3f92c615a 100644 --- a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -225,11 +225,6 @@ void onShardFailure(final int shardIndex, final SearchShardTarget shardTarget, f } - @Override - void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) { - - } - @Override void onShardSuccess(final SearchPhaseResult result) { diff --git a/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java b/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java index 5d87162a678f0..26c879da37059 100644 --- a/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java +++ b/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchService; @@ -115,9 +116,12 @@ public void testDisallowPartialsWithRedStateRecovering() throws Exception { @Override public void run() { while (stop.get() == false) { + // todo: the timeouts below should not be necessary, but this test sometimes hangs without them and that is not + // the immediate purpose of the test. verify(() -> client().prepareSearch("test").setQuery(new RangeQueryBuilder("field1").gte(0)) - .setSize(100).setAllowPartialSearchResults(false).get()); - verify(() -> client().prepareSearch("test").setSize(100).setAllowPartialSearchResults(false).get()); + .setSize(100).setAllowPartialSearchResults(false).get(TimeValue.timeValueSeconds(10))); + verify(() -> client().prepareSearch("test") + .setSize(100).setAllowPartialSearchResults(false).get(TimeValue.timeValueSeconds(10))); } } @@ -160,14 +164,19 @@ public void run() { assertFalse(restartThread.isAlive()); } finally { stop.set(true); + searchThreads.forEach(thread -> { + try { + thread.join(30000); + if (thread.isAlive()) { + logger.warn("Thread: " + thread + " is still alive"); + // do not continue unless thread terminates to avoid getting other confusing test errors. Please kill me... + thread.join(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); } - searchThreads.forEach(thread -> { - try { - thread.join(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); // hack to ensure all search contexts are removed, seems we risk leaked search contexts when coordinator dies. client().admin().indices().prepareDelete("test").get(); diff --git a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java index b9d9ff3cfc9bb..c1ebb1213495b 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java @@ -74,8 +74,8 @@ public MockSearchService(ClusterService clusterService, @Override protected void putContext(SearchContext context) { - super.putContext(context); addActiveContext(context); + super.putContext(context); } @Override From f4fe7e2a38b8fc2a796cc1238fc5aeabf9dabb8d Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Wed, 21 Aug 2019 10:48:15 +0200 Subject: [PATCH 4/9] Revert unnecessary change. --- .../action/search/AbstractSearchAsyncActionTests.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index b37c218dce534..0eddb6366c383 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -87,9 +87,8 @@ private AbstractSearchAsyncAction createAction(SearchRequest Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), Collections.singletonMap("name", Sets.newHashSet("bar", "baz")), null, request, listener, new GroupShardsIterator<>( - List.of( - new SearchShardIterator(null, new ShardId("test", "testUUID", 0), Collections.emptyList(), null), - new SearchShardIterator(null, new ShardId("test", "testUUID", 0), Collections.emptyList(), null) + Collections.singletonList( + new SearchShardIterator(null, null, Collections.emptyList(), null) ) ), timeProvider, 0, null, results, request.getMaxConcurrentShardRequests(), From 6b16c70ae4ad9e7547bb123ec4457bcb070a73ba Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Wed, 21 Aug 2019 10:49:39 +0200 Subject: [PATCH 5/9] Rename test. --- .../action/search/AbstractSearchAsyncActionTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index 0eddb6366c383..174b164aead26 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -239,7 +239,7 @@ public void run() { assertEquals(requestIds, releasedContexts); } - public void testOnShardNotAvailableDisallowPartialFailures() { + public void testShardNotAvailableWithDisallowPartialFailures() { SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(false); AtomicReference exception = new AtomicReference<>(); ActionListener listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set); From c2df1ab6996e2e35f5e7fc8bbf139fdadca09c93 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Thu, 22 Aug 2019 14:10:25 +0200 Subject: [PATCH 6/9] Improve code and reduce indexing Index less docs to reduce runtime of test and improve code checking for partial shard failures to only check flag/successful ops once. --- .../search/AbstractSearchAsyncAction.java | 21 +++++++++---------- .../search/basic/SearchRedStateIndexIT.java | 6 +++--- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 4f35e2a956df9..bce6d02a8c7fc 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -140,7 +140,7 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha } else { Boolean allowPartialResults = request.allowPartialSearchResults(); assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults"; - if (allowPartialResults == false && shardFailures.get() != null) { + if (allowPartialResults == false && successfulOps.get() != getNumShards()) { // check if there are actual failures in the atomic array since // successful retries can reset the failures to null ShardOperationFailedException[] shardSearchFailures = buildShardFailures(); @@ -154,18 +154,17 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha } onPhaseFailure(currentPhase, "Partial shards failure", null); return; + } else { + int discrepancy = getNumShards() - successfulOps.get(); + assert discrepancy > 0 : "discrepancy: " + discrepancy; + if (logger.isDebugEnabled()) { + logger.debug("Partial shards failure (unavailable: {}, successful: {}, skipped: {}, num-shards: {}, phase: {})", + discrepancy, successfulOps.get(), skippedOps.get(), getNumShards(), currentPhase.getName()); + } + onPhaseFailure(currentPhase, "Partial shards failure (" + discrepancy + " shards unavailable)", null); + return; } } - if (allowPartialResults == false && successfulOps.get() != getNumShards()) { - int discrepancy = getNumShards() - successfulOps.get(); - assert discrepancy > 0 : "discrepancy: " + discrepancy; - if (logger.isDebugEnabled()) { - logger.debug("Partial shards failure (unavailable: {}, successful: {}, skipped: {}, num-shards: {}, phase: {})", - discrepancy, successfulOps.get(), skippedOps.get(), getNumShards(), currentPhase.getName()); - } - onPhaseFailure(currentPhase, "Partial shards failure (" + discrepancy + " shards unavailable)", null); - return; - } if (logger.isTraceEnabled()) { final String resultsFrom = results.getSuccessfulResults() .map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(",")); diff --git a/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java b/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java index 26c879da37059..ea8ec72a78c49 100644 --- a/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java +++ b/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java @@ -100,9 +100,9 @@ public void testClusterDisallowPartialsWithRedState() throws Exception { } public void testDisallowPartialsWithRedStateRecovering() throws Exception { - int docCount = scaledRandomIntBetween(1000, 10000); + int docCount = scaledRandomIntBetween(10, 1000); logger.info("Using docCount [{}]", docCount); - buildIndex(cluster().numDataNodes(), 1, docCount); + buildIndex(cluster().numDataNodes() + 2, 1, docCount); AtomicBoolean stop = new AtomicBoolean(); List searchThreads = new ArrayList<>(); @@ -128,7 +128,7 @@ public void run() { void verify(Supplier call) { try { SearchResponse response = call.get(); - assertThat(response.getHits().getHits().length, equalTo(100)); + assertThat(response.getHits().getHits().length, equalTo(Math.min(100, docCount))); assertThat(response.getHits().getTotalHits().value, equalTo((long) docCount)); } catch (Exception e) { // this is OK. From ea31de981ca64fe1f5779a298c599f0bf303dac3 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Mon, 26 Aug 2019 11:06:50 +0200 Subject: [PATCH 7/9] Removed integration test To be introduced separately as a disruption test. --- .../search/basic/SearchRedStateIndexIT.java | 108 ++---------------- 1 file changed, 7 insertions(+), 101 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java b/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java index ea8ec72a78c49..6b34eab2659a2 100644 --- a/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java +++ b/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java @@ -28,17 +28,12 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchService; import org.elasticsearch.test.ESIntegTestCase; import org.junit.After; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.containsString; @@ -49,6 +44,7 @@ @ESIntegTestCase.ClusterScope(minNumDataNodes = 2) public class SearchRedStateIndexIT extends ESIntegTestCase { + public void testAllowPartialsWithRedState() throws Exception { final int numShards = cluster().numDataNodes()+2; buildRedIndex(numShards); @@ -99,89 +95,6 @@ public void testClusterDisallowPartialsWithRedState() throws Exception { assertThat(ex.getDetailedMessage(), containsString("Search rejected due to missing shard")); } - public void testDisallowPartialsWithRedStateRecovering() throws Exception { - int docCount = scaledRandomIntBetween(10, 1000); - logger.info("Using docCount [{}]", docCount); - buildIndex(cluster().numDataNodes() + 2, 1, docCount); - - AtomicBoolean stop = new AtomicBoolean(); - List searchThreads = new ArrayList<>(); - // this is a little extreme, but necessary to make this test fail reasonably often (half the runs on my machine). - for (int i = 0; i < 100; ++i) { - Thread searchThread = new Thread() { - { - setDaemon(true); - } - - @Override - public void run() { - while (stop.get() == false) { - // todo: the timeouts below should not be necessary, but this test sometimes hangs without them and that is not - // the immediate purpose of the test. - verify(() -> client().prepareSearch("test").setQuery(new RangeQueryBuilder("field1").gte(0)) - .setSize(100).setAllowPartialSearchResults(false).get(TimeValue.timeValueSeconds(10))); - verify(() -> client().prepareSearch("test") - .setSize(100).setAllowPartialSearchResults(false).get(TimeValue.timeValueSeconds(10))); - } - } - - void verify(Supplier call) { - try { - SearchResponse response = call.get(); - assertThat(response.getHits().getHits().length, equalTo(Math.min(100, docCount))); - assertThat(response.getHits().getTotalHits().value, equalTo((long) docCount)); - } catch (Exception e) { - // this is OK. - logger.info("Failed with : " + e); - } - } - }; - searchThreads.add(searchThread); - searchThread.start(); - } - try { - Thread restartThread = new Thread() { - { - setDaemon(true); - } - - @Override - public void run() { - try { - for (int i = 0; i < 5; ++i) { - internalCluster().restartRandomDataNode(); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - restartThread.start(); - for (int i = 0; i < 5; ++i) { - internalCluster().restartRandomDataNode(); - } - restartThread.join(30000); - assertFalse(restartThread.isAlive()); - } finally { - stop.set(true); - searchThreads.forEach(thread -> { - try { - thread.join(30000); - if (thread.isAlive()) { - logger.warn("Thread: " + thread + " is still alive"); - // do not continue unless thread terminates to avoid getting other confusing test errors. Please kill me... - thread.join(); - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); - } - - // hack to ensure all search contexts are removed, seems we risk leaked search contexts when coordinator dies. - client().admin().indices().prepareDelete("test").get(); - } - private void setClusterDefaultAllowPartialResults(boolean allowPartialResults) { String key = SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS.getKey(); @@ -197,22 +110,14 @@ private void setClusterDefaultAllowPartialResults(boolean allowPartialResults) { } private void buildRedIndex(int numShards) throws Exception { - buildIndex(numShards, 0, 10); - - stopNodeAndEnsureRed(); - } - - private void buildIndex(int numShards, int numReplicas, int docCount) { - assertAcked(prepareCreate("test").setSettings(Settings.builder() - .put("index.number_of_shards", numShards).put("index.number_of_replicas", numReplicas))); + assertAcked(prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", + numShards).put("index.number_of_replicas", 0))); ensureGreen(); - for (int i = 0; i < docCount; i++) { - client().prepareIndex("test", "type1", ""+i).setSource("field1", i).get(); + for (int i = 0; i < 10; i++) { + client().prepareIndex("test", "type1", ""+i).setSource("field1", "value1").get(); } refresh(); - } - private void stopNodeAndEnsureRed() throws Exception { internalCluster().stopRandomDataNode(); client().admin().cluster().prepareHealth().setWaitForStatus(ClusterHealthStatus.RED).get(); @@ -222,10 +127,11 @@ private void stopNodeAndEnsureRed() throws Exception { List unassigneds = clusterState.getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED); assertThat(unassigneds.size(), greaterThan(0)); }); + } @After - public void cleanup() { + public void cleanup() throws Exception { assertAcked(client().admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder().putNull(SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS.getKey()))); } From 04191836d950041897c75235e57c2cc3419f2992 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Mon, 26 Aug 2019 11:17:23 +0200 Subject: [PATCH 8/9] Revert a little more. --- .../main/java/org/elasticsearch/search/MockSearchService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java index c1ebb1213495b..b9d9ff3cfc9bb 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java @@ -74,8 +74,8 @@ public MockSearchService(ClusterService clusterService, @Override protected void putContext(SearchContext context) { - addActiveContext(context); super.putContext(context); + addActiveContext(context); } @Override From 2767e18d4916e6a383ca679b3fbfba9f743d0a3e Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Mon, 26 Aug 2019 11:38:37 +0200 Subject: [PATCH 9/9] Reverted whitespace changes --- .../search/basic/SearchRedStateIndexIT.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java b/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java index 6b34eab2659a2..676e50dfd5688 100644 --- a/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java +++ b/server/src/test/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java @@ -117,19 +117,19 @@ private void buildRedIndex(int numShards) throws Exception { client().prepareIndex("test", "type1", ""+i).setSource("field1", "value1").get(); } refresh(); - - internalCluster().stopRandomDataNode(); - + + internalCluster().stopRandomDataNode(); + client().admin().cluster().prepareHealth().setWaitForStatus(ClusterHealthStatus.RED).get(); assertBusy(() -> { ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); List unassigneds = clusterState.getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED); assertThat(unassigneds.size(), greaterThan(0)); - }); - + }); + } - + @After public void cleanup() throws Exception { assertAcked(client().admin().cluster().prepareUpdateSettings()