Skip to content

Commit

Permalink
[ML] Fix problem with lost shards in distributed failure test (#43153)
Browse files Browse the repository at this point in the history
We were stopping a node in the cluster at a time when
the replica shards of the .ml-state index might not
have been created.  This change moves the wait for
green status to a point where the .ml-state index
exists.

Fixes #40546
Fixes #41742

Forward port of #43111
  • Loading branch information
droberts195 committed Jun 17, 2019
1 parent b91d607 commit ea61b96
Showing 1 changed file with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
Expand Down Expand Up @@ -64,19 +63,19 @@ public void testFailOverBasics() throws Exception {
Job.Builder job = createJob("fail-over-basics-job", new ByteSizeValue(2, ByteSizeUnit.MB));
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
ensureGreen();
ensureYellow(); // at least the primary shards of the indices a job uses should be started
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
awaitJobOpenedAndAssigned(job.getId(), null);

ensureGreen(); // replicas must be assigned, otherwise we could lose a whole index
internalCluster().stopRandomDataNode();
ensureStableCluster(3);
ensureGreen();
awaitJobOpenedAndAssigned(job.getId(), null);

ensureGreen(); // replicas must be assigned, otherwise we could lose a whole index
internalCluster().stopRandomDataNode();
ensureStableCluster(2);
ensureGreen();
awaitJobOpenedAndAssigned(job.getId(), null);
}

Expand Down Expand Up @@ -106,7 +105,7 @@ public void testFailOverBasics_withDataFeeder() throws Exception {
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);
client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet();

ensureGreen();
ensureYellow(); // at least the primary shards of the indices a job uses should be started
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
awaitJobOpenedAndAssigned(job.getId(), null);
Expand All @@ -120,9 +119,9 @@ public void testFailOverBasics_withDataFeeder() throws Exception {
assertEquals(DatafeedState.STARTED, statsResponse.getResponse().results().get(0).getDatafeedState());
});

ensureGreen(); // replicas must be assigned, otherwise we could lose a whole index
internalCluster().stopRandomDataNode();
ensureStableCluster(3);
ensureGreen();
awaitJobOpenedAndAssigned(job.getId(), null);
assertBusy(() -> {
GetDatafeedsStatsAction.Response statsResponse =
Expand All @@ -131,9 +130,9 @@ public void testFailOverBasics_withDataFeeder() throws Exception {
assertEquals(DatafeedState.STARTED, statsResponse.getResponse().results().get(0).getDatafeedState());
});

ensureGreen(); // replicas must be assigned, otherwise we could lose a whole index
internalCluster().stopRandomDataNode();
ensureStableCluster(2);
ensureGreen();
awaitJobOpenedAndAssigned(job.getId(), null);
assertBusy(() -> {
GetDatafeedsStatsAction.Response statsResponse =
Expand Down Expand Up @@ -171,6 +170,7 @@ public void testJobAutoClose() throws Exception {
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);
client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet();

ensureYellow(); // at least the primary shards of the indices a job uses should be started
client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get();

StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(config.getId(), 0L);
Expand All @@ -183,7 +183,6 @@ public void testJobAutoClose() throws Exception {
});
}

@TestLogging("org.elasticsearch.xpack.persistent:TRACE,org.elasticsearch.cluster.service:DEBUG,org.elasticsearch.xpack.ml.action:DEBUG")
public void testDedicatedMlNode() throws Exception {
internalCluster().ensureAtMostNumDataNodes(0);
// start 2 non ml node that will never get a job allocated. (but ml apis are accessible from this node)
Expand All @@ -203,6 +202,7 @@ public void testDedicatedMlNode() throws Exception {
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();

ensureYellow(); // at least the primary shards of the indices a job uses should be started
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
assertBusy(() -> {
Expand Down Expand Up @@ -277,6 +277,7 @@ public void testMaxConcurrentJobAllocations() throws Exception {
}
});

ensureYellow(); // at least the primary shards of the indices a job uses should be started
int numJobs = numMlNodes * 10;
for (int i = 0; i < numJobs; i++) {
Job.Builder job = createJob(Integer.toString(i), new ByteSizeValue(2, ByteSizeUnit.MB));
Expand Down

0 comments on commit ea61b96

Please sign in to comment.