diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 5a82b45cf8c38..88d07566c74bd 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -205,7 +205,12 @@ private void fetchFollowerShardInfo( client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> { IndexStats indexStats = r.getIndex(shardId.getIndexName()); if (indexStats == null) { - errorHandler.accept(new IndexNotFoundException(shardId.getIndex())); + IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex()); + if (indexMetaData != null) { + errorHandler.accept(new ShardNotFoundException(shardId)); + } else { + errorHandler.accept(new IndexNotFoundException(shardId.getIndex())); + } return; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index cfd77b04a8baf..adde229bc0322 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -8,6 +8,9 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; @@ -24,9 +27,11 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; @@ -35,6 +40,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalTestCluster; @@ -47,6 +53,9 @@ import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; +import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; +import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -58,14 +67,17 @@ import java.util.Collection; import java.util.Collections; import java.util.Locale; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING; import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -284,6 +296,88 @@ protected void ensureEmptyWriteBuffers() throws Exception { }); } + protected void pauseFollow(String... indices) throws Exception { + for (String index : indices) { + final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request(index); + followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).get(); + } + ensureNoCcrTasks(); + } + + protected void ensureNoCcrTasks() throws Exception { + assertBusy(() -> { + final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState(); + final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(tasks.tasks(), empty()); + + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.setDetailed(true); + ListTasksResponse listTasksResponse = followerClient().admin().cluster().listTasks(listTasksRequest).get(); + int numNodeTasks = 0; + for (TaskInfo taskInfo : listTasksResponse.getTasks()) { + if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) { + numNodeTasks++; + } + } + assertThat(numNodeTasks, equalTo(0)); + }, 30, TimeUnit.SECONDS); + } + + protected String getIndexSettings(final int numberOfShards, final int numberOfReplicas, + final Map additionalIndexSettings) throws IOException { + final String settings; + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + { + builder.startObject("settings"); + { + builder.field("index.number_of_shards", numberOfShards); + builder.field("index.number_of_replicas", numberOfReplicas); + for (final Map.Entry additionalSetting : additionalIndexSettings.entrySet()) { + builder.field(additionalSetting.getKey(), additionalSetting.getValue()); + } + } + builder.endObject(); + builder.startObject("mappings"); + { + builder.startObject("doc"); + { + builder.startObject("properties"); + { + builder.startObject("f"); + { + builder.field("type", "integer"); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + settings = BytesReference.bytes(builder).utf8ToString(); + } + return settings; + } + + public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex) { + PutFollowAction.Request request = new PutFollowAction.Request(); + request.setRemoteCluster("leader_cluster"); + request.setLeaderIndex(leaderIndex); + request.setFollowRequest(resumeFollow(followerIndex)); + return request; + } + + public static ResumeFollowAction.Request resumeFollow(String followerIndex) { + ResumeFollowAction.Request request = new ResumeFollowAction.Request(); + request.setFollowerIndex(followerIndex); + request.setMaxRetryDelay(TimeValue.timeValueMillis(10)); + request.setReadPollTimeout(TimeValue.timeValueMillis(10)); + return request; + } + static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception { CountDownLatch latch = new CountDownLatch(1); clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 1c19c393e69e0..387fac21976dd 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -8,7 +8,6 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; @@ -757,33 +756,6 @@ private CheckedRunnable assertTask(final int numberOfPrimaryShards, f }; } - private void pauseFollow(String... indices) throws Exception { - for (String index : indices) { - final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request(index); - followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).get(); - } - ensureNoCcrTasks(); - } - - private void ensureNoCcrTasks() throws Exception { - assertBusy(() -> { - final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState(); - final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasks.tasks(), empty()); - - ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.setDetailed(true); - ListTasksResponse listTasksResponse = followerClient().admin().cluster().listTasks(listTasksRequest).get(); - int numNodeTasks = 0; - for (TaskInfo taskInfo : listTasksResponse.getTasks()) { - if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) { - numNodeTasks++; - } - } - assertThat(numNodeTasks, equalTo(0)); - }, 30, TimeUnit.SECONDS); - } - private CheckedRunnable assertExpectedDocumentRunnable(final int value) { return () -> { final GetResponse getResponse = followerClient().prepareGet("index2", "doc", Integer.toString(value)).get(); @@ -793,45 +765,6 @@ private CheckedRunnable assertExpectedDocumentRunnable(final int valu }; } - private String getIndexSettings(final int numberOfShards, final int numberOfReplicas, - final Map additionalIndexSettings) throws IOException { - final String settings; - try (XContentBuilder builder = jsonBuilder()) { - builder.startObject(); - { - builder.startObject("settings"); - { - builder.field("index.number_of_shards", numberOfShards); - builder.field("index.number_of_replicas", numberOfReplicas); - for (final Map.Entry additionalSetting : additionalIndexSettings.entrySet()) { - builder.field(additionalSetting.getKey(), additionalSetting.getValue()); - } - } - builder.endObject(); - builder.startObject("mappings"); - { - builder.startObject("doc"); - { - builder.startObject("properties"); - { - builder.startObject("f"); - { - builder.field("type", "integer"); - } - builder.endObject(); - } - builder.endObject(); - } - builder.endObject(); - } - builder.endObject(); - } - builder.endObject(); - settings = BytesReference.bytes(builder).utf8ToString(); - } - return settings; - } - private String getIndexSettingsWithNestedMapping(final int numberOfShards, final int numberOfReplicas, final Map additionalIndexSettings) throws IOException { final String settings; @@ -969,19 +902,4 @@ private void assertTotalNumberOfOptimizedIndexing(Index followerIndex, int numbe }); } - public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex) { - PutFollowAction.Request request = new PutFollowAction.Request(); - request.setRemoteCluster("leader_cluster"); - request.setLeaderIndex(leaderIndex); - request.setFollowRequest(resumeFollow(followerIndex)); - return request; - } - - public static ResumeFollowAction.Request resumeFollow(String followerIndex) { - ResumeFollowAction.Request request = new ResumeFollowAction.Request(); - request.setFollowerIndex(followerIndex); - request.setMaxRetryDelay(TimeValue.timeValueMillis(10)); - request.setReadPollTimeout(TimeValue.timeValueMillis(10)); - return request; - } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java new file mode 100644 index 0000000000000..49fbe15ddabae --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.xpack.CcrIntegTestCase; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; + +import java.util.Locale; + +import static java.util.Collections.singletonMap; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +public class RestartIndexFollowingIT extends CcrIntegTestCase { + + @Override + protected int numberOfNodesPerCluster() { + return 1; + } + + public void testFollowIndex() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 0, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"); + assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderGreen("index1"); + + final PutFollowAction.Request followRequest = putFollow("index1", "index2"); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + final long firstBatchNumDocs = randomIntBetween(2, 64); + logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); + for (int i = 0; i < firstBatchNumDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + assertBusy(() -> { + assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits, equalTo(firstBatchNumDocs)); + }); + + getFollowerCluster().fullRestart(); + ensureFollowerGreen("index2"); + + final long secondBatchNumDocs = randomIntBetween(2, 64); + for (int i = 0; i < secondBatchNumDocs; i++) { + leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get(); + } + + assertBusy(() -> { + assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits, + equalTo(firstBatchNumDocs + secondBatchNumDocs)); + }); + } + +}