Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disallow partial results when shard unavailable #45739

Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,16 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
return;
}
}
if (allowPartialResults == false && successfulOps.get() != getNumShards()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't have two tests to check if a partial failure occurred. This one could replace the block above and if successfulOps is less than the number of shards and shardSearchFailures.length is 0 we can assume that the failure was a shard unavailable exception ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in c2df1ab

int discrepancy = getNumShards() - successfulOps.get();
assert discrepancy > 0 : "discrepancy: " + discrepancy;
if (logger.isDebugEnabled()) {
logger.debug("Partial shards failure (unavailable: {}, successful: {}, skipped: {}, num-shards: {}, phase: {})",
discrepancy, successfulOps.get(), skippedOps.get(), getNumShards(), currentPhase.getName());
}
onPhaseFailure(currentPhase, "Partial shards failure (" + discrepancy + " shards unavailable)", null);
return;
}
if (logger.isTraceEnabled()) {
final String resultsFrom = results.getSuccessfulResults()
.map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(SearchRequest
return null;
};

return new AbstractSearchAsyncAction<SearchPhaseResult>("test", null, null, nodeIdToConnection,
return new AbstractSearchAsyncAction<SearchPhaseResult>("test", logger, null, nodeIdToConnection,
Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f),
Collections.singletonMap("name", Sets.newHashSet("bar", "baz")), null, request, listener,
new GroupShardsIterator<>(
Expand Down Expand Up @@ -239,6 +239,29 @@ public void run() {
assertEquals(requestIds, releasedContexts);
}

public void testShardNotAvailableWithDisallowPartialFailures() {
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(false);
AtomicReference<Exception> exception = new AtomicReference<>();
ActionListener<SearchResponse> listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set);
int numShards = randomIntBetween(2, 10);
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> phaseResults =
new InitialSearchPhase.ArraySearchPhaseResults<>(numShards);
AbstractSearchAsyncAction<SearchPhaseResult> action = createAction(searchRequest, phaseResults, listener, false, new AtomicLong());
// skip one to avoid the "all shards failed" failure.
SearchShardIterator skipIterator = new SearchShardIterator(null, null, Collections.emptyList(), null);
skipIterator.resetAndSkip();
action.skipShard(skipIterator);
// expect at least 2 shards, so onPhaseDone should report failure.
action.onPhaseDone();
assertThat(exception.get(), instanceOf(SearchPhaseExecutionException.class));
SearchPhaseExecutionException searchPhaseExecutionException = (SearchPhaseExecutionException)exception.get();
assertEquals("Partial shards failure (" + (numShards - 1) + " shards unavailable)",
searchPhaseExecutionException.getMessage());
assertEquals("test", searchPhaseExecutionException.getPhaseName());
assertEquals(0, searchPhaseExecutionException.shardFailures().length);
assertEquals(0, searchPhaseExecutionException.getSuppressed().length);
}

private static InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> phaseResults(Set<Long> requestIds,
List<Tuple<String, String>> nodeLookups,
int numFailures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,17 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.After;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -44,7 +49,6 @@
@ESIntegTestCase.ClusterScope(minNumDataNodes = 2)
public class SearchRedStateIndexIT extends ESIntegTestCase {


public void testAllowPartialsWithRedState() throws Exception {
final int numShards = cluster().numDataNodes()+2;
buildRedIndex(numShards);
Expand Down Expand Up @@ -95,6 +99,89 @@ public void testClusterDisallowPartialsWithRedState() throws Exception {
assertThat(ex.getDetailedMessage(), containsString("Search rejected due to missing shard"));
}

public void testDisallowPartialsWithRedStateRecovering() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be simpler to add a unit test in FetchSearchPhaseTests and AbstractSearchAsyncActionTests? We just need a child query that throws a ShardNotAvailableException and the reason why the exception is thrown is not really important. I also don't get why you need a test that fails reasonably often ? Shouldn't it fail all the time ?

int docCount = scaledRandomIntBetween(1000, 10000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this number of docs for the test ? 10, 100 should be enough, no ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test has a tendency to fail more often on larger sizes. I ended up with 10, 1000, since this makes it fail around 1/3 of the runs (but this is likely highly dependent on the hardware). Changed in: c2df1ab

logger.info("Using docCount [{}]", docCount);
buildIndex(cluster().numDataNodes(), 1, docCount);

AtomicBoolean stop = new AtomicBoolean();
List<Thread> searchThreads = new ArrayList<>();
// this is a little extreme, but necessary to make this test fail reasonably often (half the runs on my machine).
for (int i = 0; i < 100; ++i) {
Thread searchThread = new Thread() {
{
setDaemon(true);
}

@Override
public void run() {
while (stop.get() == false) {
// todo: the timeouts below should not be necessary, but this test sometimes hangs without them and that is not
// the immediate purpose of the test.
verify(() -> client().prepareSearch("test").setQuery(new RangeQueryBuilder("field1").gte(0))
.setSize(100).setAllowPartialSearchResults(false).get(TimeValue.timeValueSeconds(10)));
verify(() -> client().prepareSearch("test")
.setSize(100).setAllowPartialSearchResults(false).get(TimeValue.timeValueSeconds(10)));
}
}

void verify(Supplier<SearchResponse> call) {
try {
SearchResponse response = call.get();
assertThat(response.getHits().getHits().length, equalTo(100));
assertThat(response.getHits().getTotalHits().value, equalTo((long) docCount));
} catch (Exception e) {
// this is OK.
logger.info("Failed with : " + e);
}
}
};
searchThreads.add(searchThread);
searchThread.start();
}
try {
Thread restartThread = new Thread() {
{
setDaemon(true);
}

@Override
public void run() {
try {
for (int i = 0; i < 5; ++i) {
internalCluster().restartRandomDataNode();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
restartThread.start();
for (int i = 0; i < 5; ++i) {
internalCluster().restartRandomDataNode();
}
restartThread.join(30000);
assertFalse(restartThread.isAlive());
} finally {
stop.set(true);
searchThreads.forEach(thread -> {
try {
thread.join(30000);
if (thread.isAlive()) {
logger.warn("Thread: " + thread + " is still alive");
// do not continue unless thread terminates to avoid getting other confusing test errors. Please kill me...
thread.join();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}

// hack to ensure all search contexts are removed, seems we risk leaked search contexts when coordinator dies.
client().admin().indices().prepareDelete("test").get();
}

private void setClusterDefaultAllowPartialResults(boolean allowPartialResults) {
String key = SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS.getKey();

Expand All @@ -110,28 +197,35 @@ private void setClusterDefaultAllowPartialResults(boolean allowPartialResults) {
}

private void buildRedIndex(int numShards) throws Exception {
assertAcked(prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards",
numShards).put("index.number_of_replicas", 0)));
buildIndex(numShards, 0, 10);

stopNodeAndEnsureRed();
}

private void buildIndex(int numShards, int numReplicas, int docCount) {
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put("index.number_of_shards", numShards).put("index.number_of_replicas", numReplicas)));
ensureGreen();
for (int i = 0; i < 10; i++) {
client().prepareIndex("test", "type1", ""+i).setSource("field1", "value1").get();
for (int i = 0; i < docCount; i++) {
client().prepareIndex("test", "type1", ""+i).setSource("field1", i).get();
}
refresh();

internalCluster().stopRandomDataNode();

}

private void stopNodeAndEnsureRed() throws Exception {
internalCluster().stopRandomDataNode();

client().admin().cluster().prepareHealth().setWaitForStatus(ClusterHealthStatus.RED).get();

assertBusy(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
List<ShardRouting> unassigneds = clusterState.getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED);
assertThat(unassigneds.size(), greaterThan(0));
});

});
}

@After
public void cleanup() throws Exception {
public void cleanup() {
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().putNull(SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS.getKey())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public MockSearchService(ClusterService clusterService,

@Override
protected void putContext(SearchContext context) {
super.putContext(context);
addActiveContext(context);
super.putContext(context);
}

@Override
Expand Down