From 61c8ad906e90afb914b24250a7ca23d045b8fbe1 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Sat, 8 Sep 2018 19:18:22 +0200 Subject: [PATCH] [CCR] Added history uuid validation For correctness we need to verify whether the history uuid of the leader index shards never changes while that index is being followed. * The history UUIDs are recorded as custom index metadata in the follow index. * The follow api validates whether the current history UUIDs of the leader index shards are the same as the recorded history UUIDs. If not the follow api fails. * While a follow index is following a leader index; shard follow tasks on each shard changes api call verify whether their current history uuid is the same as the recorded history uuid. Relates to #30086 --- .../xpack/ccr/FollowIndexSecurityIT.java | 17 ++-- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 2 + .../xpack/ccr/CcrLicenseChecker.java | 45 +++++++-- .../action/CreateAndFollowIndexAction.java | 56 ++++++++++- .../xpack/ccr/action/FollowIndexAction.java | 74 +++++++++++--- .../xpack/ccr/action/ShardChangesAction.java | 27 +++++- .../xpack/ccr/action/ShardFollowNodeTask.java | 5 + .../xpack/ccr/action/ShardFollowTask.java | 46 +++++++-- .../xpack/ccr/ShardChangesIT.java | 16 ++- .../ccr/action/FollowIndexActionTests.java | 97 ++++++++++++------- .../ccr/action/ShardChangesResponseTests.java | 8 +- .../ShardFollowNodeTaskRandomTests.java | 51 +++++++--- .../ccr/action/ShardFollowNodeTaskTests.java | 41 +++++--- .../ShardFollowTaskReplicationTests.java | 25 ++++- .../ccr/action/ShardFollowTaskTests.java | 4 +- 15 files changed, 394 insertions(+), 120 deletions(-) diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index 7d658550d92b9..f314d278a5400 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -8,6 +8,7 @@ import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; @@ -26,6 +27,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; public class FollowIndexSecurityIT extends ESRestTestCase { @@ -96,16 +98,13 @@ public void testFollowIndex() throws Exception { assertThat(countCcrNodeTasks(), equalTo(0)); }); - createAndFollowIndex("leader_cluster:" + unallowedIndex, unallowedIndex); - // Verify that nothing has been replicated and no node tasks are running - // These node tasks should have been failed due to the fact that the user - // has no sufficient priviledges. - assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0))); - verifyDocuments(adminClient(), unallowedIndex, 0); + Exception e = expectThrows(ResponseException.class, + () -> createAndFollowIndex("leader_cluster:" + unallowedIndex, unallowedIndex)); + assertThat(e.getMessage(), containsString("action [indices:monitor/stats] is unauthorized for user [test_ccr]")); - followIndex("leader_cluster:" + unallowedIndex, unallowedIndex); - assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0))); - verifyDocuments(adminClient(), unallowedIndex, 0); + e = expectThrows(ResponseException.class, + () -> followIndex("leader_cluster:" + unallowedIndex, unallowedIndex)); + assertThat(e.getMessage(), containsString("action [indices:monitor/stats] is unauthorized for user [test_ccr]")); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index cd0561b1c0c60..5cdc1b0585de6 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -82,6 +82,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin { public static final String CCR_THREAD_POOL_NAME = "ccr"; + public static final String CCR_CUSTOM_METADATA_KEY = "ccr"; + public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY = "leader_index_history_uuid"; private final boolean enabled; private final Settings settings; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index cefa490f4f7e2..bd4db1f0aba4e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -10,19 +10,30 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.stats.IndexShardStats; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.index.engine.CommitStats; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.core.XPackPlugin; import java.util.Collections; +import java.util.HashMap; import java.util.Locale; +import java.util.Map; import java.util.Objects; +import java.util.function.BiConsumer; import java.util.function.BooleanSupplier; -import java.util.function.Consumer; /** * Encapsulates licensing checking for CCR. @@ -62,19 +73,19 @@ public boolean isCcrAllowed() { * of the specified listener is invoked. Otherwise, the specified consumer is invoked with the leader index metadata fetched from the * remote cluster. * - * @param client the client - * @param clusterAlias the remote cluster alias - * @param leaderIndex the name of the leader index - * @param listener the listener - * @param leaderIndexMetadataConsumer the leader index metadata consumer - * @param the type of response the listener is waiting for + * @param client the client + * @param clusterAlias the remote cluster alias + * @param leaderIndex the name of the leader index + * @param listener the listener + * @param historyUUIDAndLeaderIndexMetadataConsumer the leader index history uuid and the leader index metadata consumer + * @param the type of response the listener is waiting for */ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata( final Client client, final String clusterAlias, final String leaderIndex, final ActionListener listener, - final Consumer leaderIndexMetadataConsumer) { + final BiConsumer, IndexMetaData> historyUUIDAndLeaderIndexMetadataConsumer) { // we have to check the license on the remote cluster new RemoteClusterLicenseChecker(client, XPackLicenseState::isCcrAllowedForOperationMode).checkRemoteClusterLicenses( Collections.singletonList(clusterAlias), @@ -93,7 +104,23 @@ public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseChe final ClusterState remoteClusterState = r.getState(); final IndexMetaData leaderIndexMetadata = remoteClusterState.getMetaData().index(leaderIndex); - leaderIndexMetadataConsumer.accept(leaderIndexMetadata); + CheckedConsumer indicesStatsHandler = indicesStatsResponse -> { + IndexStats indexStats = indicesStatsResponse.getIndices().get(leaderIndex); + Map historyUUIDs = new HashMap<>(); + for (IndexShardStats indexShardStats : indexStats) { + for (ShardStats shardStats : indexShardStats) { + CommitStats commitStats = shardStats.getCommitStats(); + String historyUUID = commitStats.getUserData().get(Engine.HISTORY_UUID_KEY); + ShardId shardId = shardStats.getShardRouting().shardId(); + historyUUIDs.put(shardId.id(), historyUUID); + } + } + historyUUIDAndLeaderIndexMetadataConsumer.accept(historyUUIDs, leaderIndexMetadata); + }; + IndicesStatsRequest request = new IndicesStatsRequest(); + request.indices(leaderIndex); + remoteClient.admin().indices().stats(request, + ActionListener.wrap(indicesStatsHandler, listener::onFailure)); }, listener::onFailure); // following an index in remote cluster, so use remote client to fetch leader index metadata diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java index 2e36bca293225..7f96fa4ea8524 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java @@ -12,6 +12,11 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.indices.stats.IndexShardStats; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardsObserver; @@ -29,6 +34,7 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; @@ -36,18 +42,23 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Consumer; public class CreateAndFollowIndexAction extends Action { @@ -242,8 +253,12 @@ protected void masterOperation( private void createFollowerIndexAndFollowLocalIndex( final Request request, final ClusterState state, final ActionListener listener) { // following an index in local cluster, so use local cluster state to fetch leader index metadata - final IndexMetaData leaderIndexMetadata = state.getMetaData().index(request.getFollowRequest().getLeaderIndex()); - createFollowerIndex(leaderIndexMetadata, request, listener); + final String leaderIndex = request.getFollowRequest().getLeaderIndex(); + final IndexMetaData leaderIndexMetadata = state.getMetaData().index(leaderIndex); + Consumer> handler = historyUUID -> { + createFollowerIndex(leaderIndexMetadata, historyUUID, request, listener); + }; + fetchHistoryUUID(client, leaderIndex, handler, listener::onFailure); } private void createFollowerIndexAndFollowRemoteIndex( @@ -256,11 +271,14 @@ private void createFollowerIndexAndFollowRemoteIndex( clusterAlias, leaderIndex, listener, - leaderIndexMetaData -> createFollowerIndex(leaderIndexMetaData, request, listener)); + (historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener)); } private void createFollowerIndex( - final IndexMetaData leaderIndexMetaData, final Request request, final ActionListener listener) { + final IndexMetaData leaderIndexMetaData, + final Map historyUUIDs, + final Request request, + final ActionListener listener) { if (leaderIndexMetaData == null) { listener.onFailure(new IllegalArgumentException("leader index [" + request.getFollowRequest().getLeaderIndex() + "] does not exist")); @@ -296,6 +314,13 @@ public ClusterState execute(ClusterState currentState) throws Exception { MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); IndexMetaData.Builder imdBuilder = IndexMetaData.builder(followIndex); + // Adding the leader index uuid for each shard as custom metadata: + Map metadata = new HashMap<>(); + for (Map.Entry entry : historyUUIDs.entrySet()) { + metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY + "_" + entry.getKey(), entry.getValue()); + } + imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata); + // Copy all settings, but overwrite a few settings. Settings.Builder settingsBuilder = Settings.builder(); settingsBuilder.put(leaderIndexMetaData.getSettings()); @@ -350,6 +375,29 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowRequest().getFollowerIndex()); } + // would be great if can reuse some of the logic in CcrLicenseChecker to do remote calls for + // fetching leader index metadata and leader index uuid + static void fetchHistoryUUID(final Client client, + final String leaderIndex, + final Consumer> handler, + final Consumer errorHandler) { + IndicesStatsRequest request = new IndicesStatsRequest(); + request.indices(leaderIndex); + CheckedConsumer onResponseHandler = indicesStatsResponse -> { + IndexStats indexStats = indicesStatsResponse.getIndices().get(leaderIndex); + Map historyUUIDs = new HashMap<>(); + for (IndexShardStats indexShardStats : indexStats) { + for (ShardStats shardStats : indexShardStats) { + String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY); + ShardId shardId = shardStats.getShardRouting().shardId(); + historyUUIDs.put(shardId.id(), historyUUID); + } + } + handler.accept(historyUUIDs); + }; + client.admin().indices().stats(request, ActionListener.wrap(onResponseHandler, errorHandler)); + } + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java index 17b7bbe674b38..f40ff882c4086 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java @@ -47,11 +47,13 @@ import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -62,6 +64,8 @@ import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction.TransportAction.fetchHistoryUUID; + public class FollowIndexAction extends Action { public static final FollowIndexAction INSTANCE = new FollowIndexAction(); @@ -352,11 +356,13 @@ private void followLocalIndex(final Request request, final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex()); // following an index in local cluster, so use local cluster state to fetch leader index metadata final IndexMetaData leaderIndexMetadata = state.getMetaData().index(request.getLeaderIndex()); - try { - start(request, null, leaderIndexMetadata, followerIndexMetadata, listener); - } catch (final IOException e) { - listener.onFailure(e); - } + fetchHistoryUUID(client, request.getLeaderIndex(), historyUUIDs -> { + try { + start(request, null, leaderIndexMetadata, followerIndexMetadata, historyUUIDs, listener); + } catch (final IOException e) { + listener.onFailure(e); + } + }, listener::onFailure); } private void followRemoteIndex( @@ -371,9 +377,9 @@ private void followRemoteIndex( clusterAlias, leaderIndex, listener, - leaderIndexMetadata -> { + (leaderHistoryUUID, leaderIndexMetadata) -> { try { - start(request, clusterAlias, leaderIndexMetadata, followerIndexMetadata, listener); + start(request, clusterAlias, leaderIndexMetadata, followerIndexMetadata, leaderHistoryUUID, listener); } catch (final IOException e) { listener.onFailure(e); } @@ -395,25 +401,37 @@ void start( String clusterNameAlias, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata, + Map leaderIndexHistoryUUIDs, ActionListener handler) throws IOException { MapperService mapperService = followIndexMetadata != null ? indicesService.createIndexMapperService(followIndexMetadata) : null; - validate(request, leaderIndexMetadata, followIndexMetadata, mapperService); + validate(request, leaderIndexMetadata, followIndexMetadata, leaderIndexHistoryUUIDs, mapperService); final int numShards = followIndexMetadata.getNumberOfShards(); final AtomicInteger counter = new AtomicInteger(numShards); final AtomicReferenceArray responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards()); Map filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream() .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) { + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + for (int i = 0; i < numShards; i++) { final int shardId = i; String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; + String recordedLeaderIndexHistoryUUID = followIndexMetadata.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY) + .get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY + "_" + shardId); ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, - new ShardId(followIndexMetadata.getIndex(), shardId), - new ShardId(leaderIndexMetadata.getIndex(), shardId), - request.maxBatchOperationCount, request.maxConcurrentReadBatches, request.maxOperationSizeInBytes, - request.maxConcurrentWriteBatches, request.maxWriteBufferSize, request.retryTimeout, - request.idleShardRetryDelay, filteredHeaders); + new ShardId(followIndexMetadata.getIndex(), shardId), + new ShardId(leaderIndexMetadata.getIndex(), shardId), + request.maxBatchOperationCount, + request.maxConcurrentReadBatches, + request.maxOperationSizeInBytes, + request.maxConcurrentWriteBatches, + request.maxWriteBufferSize, + request.retryTimeout, + request.idleShardRetryDelay, + recordedLeaderIndexHistoryUUID, + filteredHeaders + ); persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, new ActionListener>() { @Override @@ -510,13 +528,27 @@ void finalizeResponse() { static void validate(Request request, IndexMetaData leaderIndex, - IndexMetaData followIndex, MapperService followerMapperService) { + IndexMetaData followIndex, + Map leaderIndexHistoryUUID, + MapperService followerMapperService) { if (leaderIndex == null) { throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist"); } if (followIndex == null) { throw new IllegalArgumentException("follow index [" + request.followerIndex + "] does not exist"); } + + Map recordedHistoryUUIDs = convert(followIndex.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY)); + assert recordedHistoryUUIDs.size() == leaderIndexHistoryUUID.size(); + for (Map.Entry entry : leaderIndexHistoryUUID.entrySet()) { + String recordedLeaderIndexHistoryUUID = recordedHistoryUUIDs.get(entry.getKey()); + if (entry.getValue().equals(recordedLeaderIndexHistoryUUID) == false) { + throw new IllegalArgumentException("follow index [" + request.followerIndex + "] should reference [" + + entry.getValue() + "] as history uuid but instead reference [" + recordedLeaderIndexHistoryUUID + + "] as history uuid"); + } + } + if (leaderIndex.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) == false) { throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not have soft deletes enabled"); } @@ -568,4 +600,16 @@ private static Settings filter(Settings originalSettings) { return settings.build(); } + private static Map convert(Map ccrMetadata) { + Map historyUUIDsByShard = new HashMap<>(); + for (Map.Entry entry : ccrMetadata.entrySet()) { + if (entry.getKey().startsWith(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY)) { + int shardId = Integer.parseInt(entry.getKey().replace(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY + "_", "")); + String previousValue = historyUUIDsByShard.put(shardId, entry.getValue()); + assert previousValue == null; + } + } + return historyUUIDsByShard; + } + } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index b505ee015bab6..c565f8f582feb 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -179,6 +179,16 @@ public long getMaxSeqNo() { return maxSeqNo; } + private String historyUUID; + + public String getHistoryUUID() { + return historyUUID; + } + + public void setHistoryUUID(String historyUUID) { + this.historyUUID = historyUUID; + } + private Translog.Operation[] operations; public Translog.Operation[] getOperations() { @@ -188,10 +198,17 @@ public Translog.Operation[] getOperations() { Response() { } - Response(final long mappingVersion, final long globalCheckpoint, final long maxSeqNo, final Translog.Operation[] operations) { + Response( + final long mappingVersion, + final long globalCheckpoint, + final long maxSeqNo, + final String historyUUID, + final Translog.Operation[] operations) { + this.mappingVersion = mappingVersion; this.globalCheckpoint = globalCheckpoint; this.maxSeqNo = maxSeqNo; + this.historyUUID = historyUUID; this.operations = operations; } @@ -201,6 +218,7 @@ public void readFrom(final StreamInput in) throws IOException { mappingVersion = in.readVLong(); globalCheckpoint = in.readZLong(); maxSeqNo = in.readZLong(); + historyUUID = in.readString(); operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); } @@ -210,6 +228,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeVLong(mappingVersion); out.writeZLong(globalCheckpoint); out.writeZLong(maxSeqNo); + out.writeString(historyUUID); out.writeArray(Translog.Operation::writeOperation, operations); } @@ -221,12 +240,13 @@ public boolean equals(final Object o) { return mappingVersion == that.mappingVersion && globalCheckpoint == that.globalCheckpoint && maxSeqNo == that.maxSeqNo && + Objects.equals(historyUUID, that.historyUUID) && Arrays.equals(operations, that.operations); } @Override public int hashCode() { - return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, Arrays.hashCode(operations)); + return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, historyUUID, Arrays.hashCode(operations)); } } @@ -253,6 +273,7 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc IndexShard indexShard = indexService.getShard(request.getShard().id()); final SeqNoStats seqNoStats = indexShard.seqNoStats(); final long mappingVersion = clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); + final String historyUUID = indexShard.getHistoryUUID(); final Translog.Operation[] operations = getOperations( indexShard, @@ -260,7 +281,7 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc request.fromSeqNo, request.maxOperationCount, request.maxOperationSizeInBytes); - return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations); + return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), historyUUID, operations); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 00e3aaaae2a8e..92ea1b8fe14ab 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -278,6 +278,11 @@ protected void onOperationsFetched(Translog.Operation[] operations) { synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) { onOperationsFetched(response.getOperations()); + if (params.getRecordedLeaderIndexHistoryUUID().equals(response.getHistoryUUID()) == false) { + markAsFailed(new IllegalStateException("unexpected history uuid, expected [" + + params.getRecordedLeaderIndexHistoryUUID() + "], actual [" + response.getHistoryUUID() + "]")); + return; + } leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint()); leaderMaxSeqNo = Math.max(leaderMaxSeqNo, response.getMaxSeqNo()); final long newFromSeqNo; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index 82482792f3907..4b96da9d923e1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -50,12 +50,13 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); public static final ParseField RETRY_TIMEOUT = new ParseField("retry_timeout"); public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay"); + public static final ParseField RECORDED_HISTORY_UUID = new ParseField("recorded_history_uuid"); @SuppressWarnings("unchecked") private static ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, (a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]), new ShardId((String) a[4], (String) a[5], (int) a[6]), (int) a[7], (int) a[8], (long) a[9], - (int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (Map) a[14])); + (int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (String) a[14], (Map) a[15])); static { PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_CLUSTER_ALIAS_FIELD); @@ -76,6 +77,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { PARSER.declareField(ConstructingObjectParser.constructorArg(), (p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()), IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING); + PARSER.declareString(ConstructingObjectParser.constructorArg(), RECORDED_HISTORY_UUID); PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS); } @@ -89,11 +91,22 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { private final int maxWriteBufferSize; private final TimeValue retryTimeout; private final TimeValue idleShardRetryDelay; + private final String recordedLeaderIndexHistoryUUID; private final Map headers; - ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, int maxBatchOperationCount, - int maxConcurrentReadBatches, long maxBatchSizeInBytes, int maxConcurrentWriteBatches, - int maxWriteBufferSize, TimeValue retryTimeout, TimeValue idleShardRetryDelay, Map headers) { + ShardFollowTask( + String leaderClusterAlias, + ShardId followShardId, + ShardId leaderShardId, + int maxBatchOperationCount, + int maxConcurrentReadBatches, + long maxBatchSizeInBytes, + int maxConcurrentWriteBatches, + int maxWriteBufferSize, + TimeValue retryTimeout, + TimeValue idleShardRetryDelay, + String recordedLeaderIndexHistoryUUID, + Map headers) { this.leaderClusterAlias = leaderClusterAlias; this.followShardId = followShardId; this.leaderShardId = leaderShardId; @@ -104,6 +117,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { this.maxWriteBufferSize = maxWriteBufferSize; this.retryTimeout = retryTimeout; this.idleShardRetryDelay = idleShardRetryDelay; + this.recordedLeaderIndexHistoryUUID = recordedLeaderIndexHistoryUUID; this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap(); } @@ -118,6 +132,7 @@ public ShardFollowTask(StreamInput in) throws IOException { this.maxWriteBufferSize = in.readVInt(); this.retryTimeout = in.readTimeValue(); this.idleShardRetryDelay = in.readTimeValue(); + this.recordedLeaderIndexHistoryUUID = in.readString(); this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); } @@ -165,6 +180,10 @@ public String getTaskId() { return followShardId.getIndex().getUUID() + "-" + followShardId.getId(); } + public String getRecordedLeaderIndexHistoryUUID() { + return recordedLeaderIndexHistoryUUID; + } + public Map getHeaders() { return headers; } @@ -186,6 +205,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(maxWriteBufferSize); out.writeTimeValue(retryTimeout); out.writeTimeValue(idleShardRetryDelay); + out.writeString(recordedLeaderIndexHistoryUUID); out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); } @@ -212,6 +232,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); builder.field(RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep()); builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep()); + builder.field(RECORDED_HISTORY_UUID.getPreferredName(), recordedLeaderIndexHistoryUUID); builder.field(HEADERS.getPreferredName(), headers); return builder.endObject(); } @@ -231,13 +252,26 @@ public boolean equals(Object o) { maxWriteBufferSize == that.maxWriteBufferSize && Objects.equals(retryTimeout, that.retryTimeout) && Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay) && + Objects.equals(recordedLeaderIndexHistoryUUID, that.recordedLeaderIndexHistoryUUID) && Objects.equals(headers, that.headers); } @Override public int hashCode() { - return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxBatchOperationCount, maxConcurrentReadBatches, - maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, retryTimeout, idleShardRetryDelay, headers); + return Objects.hash( + leaderClusterAlias, + followShardId, + leaderShardId, + maxBatchOperationCount, + maxConcurrentReadBatches, + maxConcurrentWriteBatches, + maxBatchSizeInBytes, + maxWriteBufferSize, + retryTimeout, + idleShardRetryDelay, + recordedLeaderIndexHistoryUUID, + headers + ); } public String toString() { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index 7980e1281406a..bbbb7f9517bd6 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; @@ -357,16 +358,11 @@ public void testFollowIndexWithNestedField() throws Exception { final String leaderIndexSettings = getIndexSettingsWithNestedMapping(1, between(0, 1), singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); - - final String followerIndexSettings = - getIndexSettingsWithNestedMapping(1, between(0, 1), singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); - assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON)); - internalCluster().ensureAtLeastNumDataNodes(2); - ensureGreen("index1", "index2"); + ensureGreen("index1"); final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); - client().execute(FollowIndexAction.INSTANCE, followRequest).get(); + client().execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest)).get(); final int numDocs = randomIntBetween(2, 64); for (int i = 0; i < numDocs; i++) { @@ -409,13 +405,13 @@ public void testFollowNonExistentIndex() throws Exception { assertAcked(client().admin().indices().prepareCreate("test-follower").get()); // Leader index does not exist. FollowIndexAction.Request followRequest1 = createFollowRequest("non-existent-leader", "test-follower"); - expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest1).actionGet()); + expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest1).actionGet()); // Follower index does not exist. FollowIndexAction.Request followRequest2 = createFollowRequest("non-test-leader", "non-existent-follower"); - expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest2).actionGet()); + expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest2).actionGet()); // Both indices do not exist. FollowIndexAction.Request followRequest3 = createFollowRequest("non-existent-leader", "non-existent-follower"); - expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest3).actionGet()); + expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest3).actionGet()); } @TestLogging("_root:DEBUG") diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java index 5b52700f5579b..64081d54c539b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java @@ -13,66 +13,87 @@ import org.elasticsearch.index.MapperTestUtils; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.ShardChangesIT; import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; import static org.hamcrest.Matchers.equalTo; public class FollowIndexActionTests extends ESTestCase { + private static final Map CUSTOM_METADATA = + singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY + "_0", "uuid"); + public void testValidation() throws IOException { FollowIndexAction.Request request = ShardChangesIT.createFollowRequest("index1", "index2"); + Map UUIDs = Collections.singletonMap(0, "uuid"); { // should fail, because leader index does not exist - Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, null, null, null)); + Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, null, null, null, null)); assertThat(e.getMessage(), equalTo("leader index [index1] does not exist")); } { // should fail, because follow index does not exist - IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY); - Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, leaderIMD, null, null)); + IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, emptyMap()); + Exception e = expectThrows(IllegalArgumentException.class, + () -> FollowIndexAction.validate(request, leaderIMD, null, null, null)); assertThat(e.getMessage(), equalTo("follow index [index2] does not exist")); } + { + // should fail because the recorded leader index history uuid is not equal to the leader actual index history uuid: + IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, emptyMap()); + Map customMetaData = + singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY + "_0", "another-uuid"); + IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY, customMetaData); + Exception e = expectThrows(IllegalArgumentException.class, + () -> FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, null)); + assertThat(e.getMessage(), equalTo("follow index [index2] should reference [uuid] as history uuid but " + + "instead reference [another-uuid] as history uuid")); + } { // should fail because leader index does not have soft deletes enabled - IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY); - IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY); + IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, emptyMap()); + IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY, CUSTOM_METADATA); Exception e = expectThrows(IllegalArgumentException.class, - () -> FollowIndexAction.validate(request, leaderIMD, followIMD, null)); + () -> FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, null)); assertThat(e.getMessage(), equalTo("leader index [index1] does not have soft deletes enabled")); } { // should fail because the number of primary shards between leader and follow index are not equal IndexMetaData leaderIMD = createIMD("index1", 5, Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build()); - IndexMetaData followIMD = createIMD("index2", 4, Settings.EMPTY); + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap()); + IndexMetaData followIMD = createIMD("index2", 4, Settings.EMPTY, CUSTOM_METADATA); Exception e = expectThrows(IllegalArgumentException.class, - () -> FollowIndexAction.validate(request, leaderIMD, followIMD, null)); + () -> FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, null)); assertThat(e.getMessage(), equalTo("leader index primary shards [5] does not match with the number of shards of the follow index [4]")); } { // should fail, because leader index is closed IndexMetaData leaderIMD = createIMD("index1", State.CLOSE, "{}", 5, Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build()); + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap()); IndexMetaData followIMD = createIMD("index2", State.OPEN, "{}", 5, Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build()); + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), CUSTOM_METADATA); Exception e = expectThrows(IllegalArgumentException.class, - () -> FollowIndexAction.validate(request, leaderIMD, followIMD, null)); + () -> FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, null)); assertThat(e.getMessage(), equalTo("leader and follow index must be open")); } { // should fail, because leader has a field with the same name mapped as keyword and follower as text IndexMetaData leaderIMD = createIMD("index1", State.OPEN, "{\"properties\": {\"field\": {\"type\": \"keyword\"}}}", 5, - Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build()); + Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap()); IndexMetaData followIMD = createIMD("index2", State.OPEN, "{\"properties\": {\"field\": {\"type\": \"text\"}}}", 5, - Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build()); + Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(), CUSTOM_METADATA); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2"); mapperService.updateMapping(null, followIMD); Exception e = expectThrows(IllegalArgumentException.class, - () -> FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService)); + () -> FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, mapperService)); assertThat(e.getMessage(), equalTo("mapper [field] of different type, current_type [text], merged_type [keyword]")); } { @@ -81,39 +102,39 @@ public void testValidation() throws IOException { IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5, Settings.builder() .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true") .put("index.analysis.analyzer.my_analyzer.type", "custom") - .put("index.analysis.analyzer.my_analyzer.tokenizer", "whitespace").build()); + .put("index.analysis.analyzer.my_analyzer.tokenizer", "whitespace").build(), emptyMap()); IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5, Settings.builder() .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) .put("index.analysis.analyzer.my_analyzer.type", "custom") - .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build()); + .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), CUSTOM_METADATA); Exception e = expectThrows(IllegalArgumentException.class, - () -> FollowIndexAction.validate(request, leaderIMD, followIMD, null)); + () -> FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, null)); assertThat(e.getMessage(), equalTo("the leader and follower index settings must be identical")); } { // should fail because the following index does not have the following_index settings IndexMetaData leaderIMD = createIMD("index1", 5, - Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build()); + Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap()); Settings followingIndexSettings = randomBoolean() ? Settings.EMPTY : Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), false).build(); - IndexMetaData followIMD = createIMD("index2", 5, followingIndexSettings); + IndexMetaData followIMD = createIMD("index2", 5, followingIndexSettings, CUSTOM_METADATA); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), followingIndexSettings, "index2"); mapperService.updateMapping(null, followIMD); IllegalArgumentException error = expectThrows(IllegalArgumentException.class, - () -> FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService)); + () -> FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, mapperService)); assertThat(error.getMessage(), equalTo("the following index [index2] is not ready to follow; " + "the setting [index.xpack.ccr.following_index] must be enabled.")); } { // should succeed IndexMetaData leaderIMD = createIMD("index1", 5, Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build()); + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap()); IndexMetaData followIMD = createIMD("index2", 5, Settings.builder() - .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build()); + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(), CUSTOM_METADATA); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2"); mapperService.updateMapping(null, followIMD); - FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService); + FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, mapperService); } { // should succeed, index settings are identical @@ -121,15 +142,15 @@ public void testValidation() throws IOException { IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5, Settings.builder() .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true") .put("index.analysis.analyzer.my_analyzer.type", "custom") - .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build()); + .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), emptyMap()); IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5, Settings.builder() .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) .put("index.analysis.analyzer.my_analyzer.type", "custom") - .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build()); + .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), CUSTOM_METADATA); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), followIMD.getSettings(), "index2"); mapperService.updateMapping(null, followIMD); - FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService); + FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, mapperService); } { // should succeed despite whitelisted settings being different @@ -138,25 +159,32 @@ public void testValidation() throws IOException { .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true") .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s") .put("index.analysis.analyzer.my_analyzer.type", "custom") - .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build()); + .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), emptyMap()); IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5, Settings.builder() .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "10s") .put("index.analysis.analyzer.my_analyzer.type", "custom") - .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build()); + .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), CUSTOM_METADATA); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), followIMD.getSettings(), "index2"); mapperService.updateMapping(null, followIMD); - FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService); + FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, mapperService); } } - private static IndexMetaData createIMD(String index, int numberOfShards, Settings settings) throws IOException { - return createIMD(index, State.OPEN, "{\"properties\": {}}", numberOfShards, settings); + private static IndexMetaData createIMD(String index, + int numberOfShards, + Settings settings, + Map custom) throws IOException { + return createIMD(index, State.OPEN, "{\"properties\": {}}", numberOfShards, settings, custom); } - private static IndexMetaData createIMD(String index, State state, String mapping, int numberOfShards, - Settings settings) throws IOException { + private static IndexMetaData createIMD(String index, + State state, + String mapping, + int numberOfShards, + Settings settings, + Map custom) throws IOException { return IndexMetaData.builder(index) .settings(settings(Version.CURRENT).put(settings)) .numberOfShards(numberOfShards) @@ -164,6 +192,7 @@ private static IndexMetaData createIMD(String index, State state, String mapping .numberOfReplicas(0) .setRoutingNumShards(numberOfShards) .putMapping("_doc", mapping) + .putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, custom) .build(); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java index e9c67097d72b2..b84397654af1f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java @@ -20,7 +20,13 @@ protected ShardChangesAction.Response createTestInstance() { for (int i = 0; i < numOps; i++) { operations[i] = new Translog.NoOp(i, 0, "test"); } - return new ShardChangesAction.Response(mappingVersion, leaderGlobalCheckpoint, leaderMaxSeqNo, operations); + return new ShardChangesAction.Response( + mappingVersion, + leaderGlobalCheckpoint, + leaderMaxSeqNo, + randomAlphaOfLength(4), + operations + ); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index 9bfd6b9d6ef42..856915903cfcf 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -73,10 +73,20 @@ private void startAndAssertAndStopTask(ShardFollowNodeTask task, TestRun testRun private ShardFollowNodeTask createShardFollowTask(int concurrency, TestRun testRun) { AtomicBoolean stopped = new AtomicBoolean(false); - ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0), - new ShardId("leader_index", "", 0), testRun.maxOperationCount, concurrency, - ShardFollowNodeTask.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, concurrency, 10240, - TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap()); + ShardFollowTask params = new ShardFollowTask( + null, + new ShardId("follow_index", "", 0), + new ShardId("leader_index", "", 0), + testRun.maxOperationCount, + concurrency, + ShardFollowNodeTask.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, + concurrency, + 10240, + TimeValue.timeValueMillis(10), + TimeValue.timeValueMillis(10), + "uuid", + Collections.emptyMap() + ); ThreadPool threadPool = new TestThreadPool(getClass().getSimpleName()); BiConsumer scheduler = (delay, task) -> { @@ -146,7 +156,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co assert from >= testRun.finalExpectedGlobalCheckpoint; final long globalCheckpoint = tracker.getCheckpoint(); final long maxSeqNo = tracker.getMaxSeqNo(); - handler.accept(new ShardChangesAction.Response(0L,globalCheckpoint, maxSeqNo, new Translog.Operation[0])); + handler.accept(new ShardChangesAction.Response(0L,globalCheckpoint, maxSeqNo, "uuid", new Translog.Operation[0])); } }; threadPool.generic().execute(task); @@ -213,8 +223,17 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, byte[] source = "{}".getBytes(StandardCharsets.UTF_8); ops.add(new Translog.Index("doc", id, seqNo, 0, source)); } - item.add(new TestResponse(null, mappingVersion, - new ShardChangesAction.Response(mappingVersion, nextGlobalCheckPoint, nextGlobalCheckPoint, ops.toArray(EMPTY)))); + item.add(new TestResponse( + null, + mappingVersion, + new ShardChangesAction.Response( + mappingVersion, + nextGlobalCheckPoint, + nextGlobalCheckPoint, + "uuid", + ops.toArray(EMPTY)) + ) + ); responses.put(prevGlobalCheckpoint, item); } else { // Simulates a leader shard copy not having all the operations the shard follow task thinks it has by @@ -230,8 +249,13 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, } // Sometimes add an empty shard changes response to also simulate a leader shard lagging behind if (sometimes()) { - ShardChangesAction.Response response = - new ShardChangesAction.Response(mappingVersion, prevGlobalCheckpoint, prevGlobalCheckpoint, EMPTY); + ShardChangesAction.Response response = new ShardChangesAction.Response( + mappingVersion, + prevGlobalCheckpoint, + prevGlobalCheckpoint, + "uuid", + EMPTY + ); item.add(new TestResponse(null, mappingVersion, response)); } List ops = new ArrayList<>(); @@ -242,8 +266,13 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, } // Report toSeqNo to simulate maxBatchSizeInBytes limit being met or last op to simulate a shard lagging behind: long localLeaderGCP = randomBoolean() ? ops.get(ops.size() - 1).seqNo() : toSeqNo; - ShardChangesAction.Response response = - new ShardChangesAction.Response(mappingVersion, localLeaderGCP, localLeaderGCP, ops.toArray(EMPTY)); + ShardChangesAction.Response response = new ShardChangesAction.Response( + mappingVersion, + localLeaderGCP, + localLeaderGCP, + "uuid", + ops.toArray(EMPTY) + ); item.add(new TestResponse(null, mappingVersion, response)); responses.put(fromSeqNo, Collections.unmodifiableList(item)); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 4f7c0bf16645c..11d11f8826d21 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -393,7 +393,7 @@ public void testReceiveNothingExpectedSomething() { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); shardChangesRequests.clear(); - task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, new Translog.Operation[0])); + task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, "uuid", new Translog.Operation[0])); assertThat(shardChangesRequests.size(), equalTo(1)); assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); @@ -425,7 +425,7 @@ public void testDelayCoordinatesRead() { // Also invokes coordinateReads() task.innerHandleReadResponse(0L, 63L, response); task.innerHandleReadResponse(64L, 63L, - new ShardChangesAction.Response(0, 63L, 63L, new Translog.Operation[0])); + new ShardChangesAction.Response(0, 63L, 63L, "uuid", new Translog.Operation[0])); assertThat(counter[0], equalTo(1)); } @@ -714,9 +714,20 @@ public void testHandleWriteResponse() { ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxConcurrentReadBatches, int maxConcurrentWriteBatches, int bufferWriteLimit, long maxBatchSizeInBytes) { AtomicBoolean stopped = new AtomicBoolean(false); - ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0), - new ShardId("leader_index", "", 0), maxBatchOperationCount, maxConcurrentReadBatches, maxBatchSizeInBytes, - maxConcurrentWriteBatches, bufferWriteLimit, TimeValue.ZERO, TimeValue.ZERO, Collections.emptyMap()); + ShardFollowTask params = new ShardFollowTask( + null, + new ShardId("follow_index", "", 0), + new ShardId("leader_index", "", 0), + maxBatchOperationCount, + maxConcurrentReadBatches, + maxBatchSizeInBytes, + maxConcurrentWriteBatches, + bufferWriteLimit, + TimeValue.ZERO, + TimeValue.ZERO, + "uuid", + Collections.emptyMap() + ); shardChangesRequests = new ArrayList<>(); bulkShardOperationRequests = new ArrayList<>(); @@ -777,12 +788,13 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con for (int i = 0; i < requestBatchSize; i++) { operations[i] = new Translog.NoOp(from + i, 0, "test"); } - final ShardChangesAction.Response response = - new ShardChangesAction.Response( - mappingVersions.poll(), - leaderGlobalCheckpoints.poll(), - maxSeqNos.poll(), - operations); + final ShardChangesAction.Response response = new ShardChangesAction.Response( + mappingVersions.poll(), + leaderGlobalCheckpoints.poll(), + maxSeqNos.poll(), + "uuid", + operations + ); handler.accept(response); } } @@ -814,7 +826,12 @@ private static ShardChangesAction.Response generateShardChangesResponse(long fro ops.add(new Translog.Index("doc", id, seqNo, 0, source)); } return new ShardChangesAction.Response( - mappingVersion, leaderGlobalCheckPoint, leaderGlobalCheckPoint, ops.toArray(new Translog.Operation[0])); + mappingVersion, + leaderGlobalCheckPoint, + leaderGlobalCheckPoint, + "uuid", + ops.toArray(new Translog.Operation[0]) + ); } void startTask(ShardFollowNodeTask task, long leaderGlobalCheckpoint, long followerGlobalCheckpoint) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 2cd024cb03cf7..73cae2ce8b011 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -159,9 +159,19 @@ private ReplicationGroup createFollowGroup(int replicas) throws IOException { } private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, ReplicationGroup followerGroup) { - ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0), - new ShardId("leader_index", "", 0), between(1, 64), between(1, 8), Long.MAX_VALUE, between(1, 4), 10240, - TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap()); + ShardFollowTask params = new ShardFollowTask( + null, + new ShardId("follow_index", "", 0), + new ShardId("leader_index", "", 0), + between(1, 64), + between(1, 8), + Long.MAX_VALUE, + between(1, 4), 10240, + TimeValue.timeValueMillis(10), + TimeValue.timeValueMillis(10), + "uuid", + Collections.emptyMap() + ); BiConsumer scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task); AtomicBoolean stopped = new AtomicBoolean(false); @@ -212,8 +222,13 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from, maxOperationCount, params.getMaxBatchSizeInBytes()); // hard code mapping version; this is ok, as mapping updates are not tested here - final ShardChangesAction.Response response = - new ShardChangesAction.Response(1L, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), ops); + final ShardChangesAction.Response response = new ShardChangesAction.Response( + 1L, + seqNoStats.getGlobalCheckpoint(), + seqNoStats.getMaxSeqNo(), + "uuid", + ops + ); handler.accept(response); return; } catch (Exception e) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java index 300794a6c00cf..fa11ddf4bf976 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java @@ -34,7 +34,9 @@ protected ShardFollowTask createTestInstance() { randomIntBetween(1, Integer.MAX_VALUE), TimeValue.parseTimeValue(randomTimeValue(), ""), TimeValue.parseTimeValue(randomTimeValue(), ""), - randomBoolean() ? null : Collections.singletonMap("key", "value")); + randomAlphaOfLength(4), + randomBoolean() ? null : Collections.singletonMap("key", "value") + ); } @Override