Skip to content

Commit

Permalink
[CCR] Retry when no index shard stats can be found (#34852)
Browse files Browse the repository at this point in the history
Index shard stats for the follower shard are fetched, when a shard follow task is started.
This is needed in order to bootstap the shard follow task with the follower global checkpoint.

Sometimes index shard stats are not available (e.g. during a restart) and
we fail now, while it is very likely that these stats will be available some time later.
  • Loading branch information
martijnvg committed Oct 26, 2018
1 parent 6ca1048 commit 359cbf1
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,12 @@ private void fetchFollowerShardInfo(
client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> {
IndexStats indexStats = r.getIndex(shardId.getIndexName());
if (indexStats == null) {
errorHandler.accept(new IndexNotFoundException(shardId.getIndex()));
IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
if (indexMetaData != null) {
errorHandler.accept(new ShardNotFoundException(shardId));
} else {
errorHandler.accept(new IndexNotFoundException(shardId.getIndex()));
}
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
Expand All @@ -24,9 +27,11 @@
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
Expand All @@ -35,6 +40,7 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalTestCluster;
Expand All @@ -47,6 +53,9 @@
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -58,14 +67,17 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING;
import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

Expand Down Expand Up @@ -284,6 +296,88 @@ protected void ensureEmptyWriteBuffers() throws Exception {
});
}

protected void pauseFollow(String... indices) throws Exception {
for (String index : indices) {
final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request(index);
followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).get();
}
ensureNoCcrTasks();
}

protected void ensureNoCcrTasks() throws Exception {
assertBusy(() -> {
final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState();
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasks.tasks(), empty());

ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);
ListTasksResponse listTasksResponse = followerClient().admin().cluster().listTasks(listTasksRequest).get();
int numNodeTasks = 0;
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) {
numNodeTasks++;
}
}
assertThat(numNodeTasks, equalTo(0));
}, 30, TimeUnit.SECONDS);
}

protected String getIndexSettings(final int numberOfShards, final int numberOfReplicas,
final Map<String, String> additionalIndexSettings) throws IOException {
final String settings;
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
builder.startObject("settings");
{
builder.field("index.number_of_shards", numberOfShards);
builder.field("index.number_of_replicas", numberOfReplicas);
for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
builder.field(additionalSetting.getKey(), additionalSetting.getValue());
}
}
builder.endObject();
builder.startObject("mappings");
{
builder.startObject("doc");
{
builder.startObject("properties");
{
builder.startObject("f");
{
builder.field("type", "integer");
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
settings = BytesReference.bytes(builder).utf8ToString();
}
return settings;
}

public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex) {
PutFollowAction.Request request = new PutFollowAction.Request();
request.setRemoteCluster("leader_cluster");
request.setLeaderIndex(leaderIndex);
request.setFollowRequest(resumeFollow(followerIndex));
return request;
}

public static ResumeFollowAction.Request resumeFollow(String followerIndex) {
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setFollowerIndex(followerIndex);
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
request.setReadPollTimeout(TimeValue.timeValueMillis(10));
return request;
}

static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
Expand Down Expand Up @@ -757,33 +756,6 @@ private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, f
};
}

private void pauseFollow(String... indices) throws Exception {
for (String index : indices) {
final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request(index);
followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).get();
}
ensureNoCcrTasks();
}

private void ensureNoCcrTasks() throws Exception {
assertBusy(() -> {
final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState();
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasks.tasks(), empty());

ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);
ListTasksResponse listTasksResponse = followerClient().admin().cluster().listTasks(listTasksRequest).get();
int numNodeTasks = 0;
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) {
numNodeTasks++;
}
}
assertThat(numNodeTasks, equalTo(0));
}, 30, TimeUnit.SECONDS);
}

private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int value) {
return () -> {
final GetResponse getResponse = followerClient().prepareGet("index2", "doc", Integer.toString(value)).get();
Expand All @@ -793,45 +765,6 @@ private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int valu
};
}

private String getIndexSettings(final int numberOfShards, final int numberOfReplicas,
final Map<String, String> additionalIndexSettings) throws IOException {
final String settings;
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
builder.startObject("settings");
{
builder.field("index.number_of_shards", numberOfShards);
builder.field("index.number_of_replicas", numberOfReplicas);
for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
builder.field(additionalSetting.getKey(), additionalSetting.getValue());
}
}
builder.endObject();
builder.startObject("mappings");
{
builder.startObject("doc");
{
builder.startObject("properties");
{
builder.startObject("f");
{
builder.field("type", "integer");
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
settings = BytesReference.bytes(builder).utf8ToString();
}
return settings;
}

private String getIndexSettingsWithNestedMapping(final int numberOfShards, final int numberOfReplicas,
final Map<String, String> additionalIndexSettings) throws IOException {
final String settings;
Expand Down Expand Up @@ -969,19 +902,4 @@ private void assertTotalNumberOfOptimizedIndexing(Index followerIndex, int numbe
});
}

public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex) {
PutFollowAction.Request request = new PutFollowAction.Request();
request.setRemoteCluster("leader_cluster");
request.setLeaderIndex(leaderIndex);
request.setFollowRequest(resumeFollow(followerIndex));
return request;
}

public static ResumeFollowAction.Request resumeFollow(String followerIndex) {
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setFollowerIndex(followerIndex);
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
request.setReadPollTimeout(TimeValue.timeValueMillis(10));
return request;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;

import java.util.Locale;

import static java.util.Collections.singletonMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class RestartIndexFollowingIT extends CcrIntegTestCase {

@Override
protected int numberOfNodesPerCluster() {
return 1;
}

public void testFollowIndex() throws Exception {
final String leaderIndexSettings = getIndexSettings(1, 0,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true");
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen("index1");

final PutFollowAction.Request followRequest = putFollow("index1", "index2");
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();

final long firstBatchNumDocs = randomIntBetween(2, 64);
logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
for (int i = 0; i < firstBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}

assertBusy(() -> {
assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits, equalTo(firstBatchNumDocs));
});

getFollowerCluster().fullRestart();
ensureFollowerGreen("index2");

final long secondBatchNumDocs = randomIntBetween(2, 64);
for (int i = 0; i < secondBatchNumDocs; i++) {
leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
}

assertBusy(() -> {
assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits,
equalTo(firstBatchNumDocs + secondBatchNumDocs));
});
}

}

0 comments on commit 359cbf1

Please sign in to comment.