From 39ba013d944e07031b17e60e864054f25803090d Mon Sep 17 00:00:00 2001 From: Iraklis Psaroudakis Date: Tue, 31 Jan 2023 11:31:36 +0200 Subject: [PATCH] Unpromotables skip replication and peer recovery (#93210) For skipping replication: * ReplicationTracker and Group filter shards that are promotable to primary * Remove unpromotable shards from in sync allocations in metadata * There is a new Refresh action for unpromotable replica shards Fixes ES-4861 For skipping peer recovery: * Unpromotable shards pass directly to STARTED skipping some intermediate peer recovery stages and messages Fixes ES-5257 --- docs/changelog/93210.yaml | 5 + .../cluster/routing/ShardRoutingRoleIT.java | 173 +++++++++++++++++- .../refresh/ReplicaShardRefreshRequest.java | 58 ------ .../refresh/TransportShardRefreshAction.java | 80 +++++--- ...ansportUnpromotableShardRefreshAction.java | 47 +++++ .../UnpromotableShardRefreshRequest.java | 59 ++++++ .../replication/ReplicationOperation.java | 1 + .../cluster/routing/IndexRoutingTable.java | 4 +- .../routing/IndexShardRoutingTable.java | 14 +- .../allocation/IndexMetadataUpdater.java | 14 +- .../index/seqno/ReplicationTracker.java | 23 ++- .../elasticsearch/index/shard/IndexShard.java | 28 ++- .../index/shard/ReplicationGroup.java | 5 +- .../recovery/PeerRecoveryTargetService.java | 83 +++++---- .../recovery/RecoveriesCollection.java | 1 + ...portVerifyShardBeforeCloseActionTests.java | 2 +- ...TransportResyncReplicationActionTests.java | 2 +- .../ReplicationOperationTests.java | 2 +- .../TransportReplicationActionTests.java | 7 +- .../cluster/ClusterStateTests.java | 6 +- .../metadata/AutoExpandReplicasTests.java | 8 +- .../index/engine/EngineTestCase.java | 2 +- 22 files changed, 464 insertions(+), 160 deletions(-) create mode 100644 docs/changelog/93210.yaml delete mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/refresh/ReplicaShardRefreshRequest.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/refresh/UnpromotableShardRefreshRequest.java diff --git a/docs/changelog/93210.yaml b/docs/changelog/93210.yaml new file mode 100644 index 0000000000000..179f4ab9dec8d --- /dev/null +++ b/docs/changelog/93210.yaml @@ -0,0 +1,5 @@ +pr: 93210 +summary: Unpromotables skip replication and peer recovery +area: Allocation +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java index 5b47e6d08acc4..2f186a41139b7 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/ShardRoutingRoleIT.java @@ -10,10 +10,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.indices.refresh.TransportUnpromotableShardRefreshAction; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand; @@ -28,6 +31,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.shard.IndexShard; @@ -39,6 +43,8 @@ import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.XContentTestUtils; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.Arrays; @@ -46,16 +52,22 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; @SuppressWarnings("resource") public class ShardRoutingRoleIT extends ESIntegTestCase { @@ -65,6 +77,7 @@ public class ShardRoutingRoleIT extends ESIntegTestCase { public static class TestPlugin extends Plugin implements ClusterPlugin, EnginePlugin { volatile int numIndexingCopies = 1; + static final String NODE_ATTR_UNPROMOTABLE_ONLY = "unpromotableonly"; @Override public ShardRoutingRoleStrategy getShardRoutingRoleStrategy() { @@ -93,12 +106,55 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n } return super.canForceAllocatePrimary(shardRouting, node, allocation); } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + var nodesWithUnpromotableOnly = allocation.getClusterState() + .nodes() + .stream() + .filter(n -> Objects.equals("true", n.getAttributes().get(NODE_ATTR_UNPROMOTABLE_ONLY))) + .map(DiscoveryNode::getName) + .collect(Collectors.toUnmodifiableSet()); + if (nodesWithUnpromotableOnly.isEmpty() == false) { + if (nodesWithUnpromotableOnly.contains(node.node().getName())) { + if (shardRouting.isPromotableToPrimary()) { + return allocation.decision( + Decision.NO, + "test", + "shard is promotable to primary so may not be assigned to [" + node.node().getName() + "]" + ); + } + } else { + if (shardRouting.isPromotableToPrimary() == false) { + return allocation.decision( + Decision.NO, + "test", + "shard is not promotable to primary so may not be assigned to [" + node.node().getName() + "]" + ); + } + } + } + return Decision.YES; + } }); } @Override public Optional getEngineFactory(IndexSettings indexSettings) { - return Optional.of(config -> config.isPromotableToPrimary() ? new InternalEngine(config) : new NoOpEngine(config)); + return Optional.of(config -> { + if (config.isPromotableToPrimary()) { + return new InternalEngine(config); + } else { + try { + config.getStore().createEmpty(); + } catch (IOException e) { + logger.error("Error creating empty store", e); + throw new RuntimeException(e); + } + + return new NoOpEngine(EngineTestCase.copy(config, () -> -1L)); + } + }); } } @@ -109,7 +165,7 @@ protected boolean addMockInternalEngine() { @Override protected Collection> nodePlugins() { - return CollectionUtils.appendToCopy(super.nodePlugins(), TestPlugin.class); + return CollectionUtils.concatLists(List.of(MockTransportService.TestPlugin.class, TestPlugin.class), super.nodePlugins()); } @Override @@ -193,11 +249,32 @@ private static void assertRolesInRoutingTableXContent(ClusterState state) { } } - public void testShardCreation() { + private static void installMockTransportVerifications(RoutingTableWatcher routingTableWatcher) { + for (var transportService : internalCluster().getInstances(TransportService.class)) { + MockTransportService mockTransportService = (MockTransportService) transportService; + mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (routingTableWatcher.numIndexingCopies == 1) { + assertThat("no recovery action should be exchanged", action, not(startsWith("internal:index/shard/recovery/"))); + assertThat("no replicated action should be exchanged", action, not(containsString("[r]"))); + } + connection.sendRequest(requestId, action, request, options); + }); + mockTransportService.addRequestHandlingBehavior( + TransportUnpromotableShardRefreshAction.NAME, + (handler, request, channel, task) -> { + // Skip handling the request and send an immediate empty response + channel.sendResponse(ActionResponse.Empty.INSTANCE); + } + ); + } + } + + public void testShardCreation() throws Exception { var routingTableWatcher = new RoutingTableWatcher(); var numDataNodes = routingTableWatcher.numReplicas + 2; internalCluster().ensureAtLeastNumDataNodes(numDataNodes); + installMockTransportVerifications(routingTableWatcher); getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies; final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); @@ -234,6 +311,7 @@ public void testShardCreation() { ensureGreen(INDEX_NAME); assertEngineTypes(); + indexRandom(randomBoolean(), INDEX_NAME, randomIntBetween(50, 100)); // removing replicas drops SEARCH_ONLY copies first while (routingTableWatcher.numReplicas > 0) { @@ -341,6 +419,7 @@ public void testPromotion() { var numDataNodes = routingTableWatcher.numReplicas + 2; internalCluster().ensureAtLeastNumDataNodes(numDataNodes); + installMockTransportVerifications(routingTableWatcher); getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies; final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); @@ -399,7 +478,7 @@ public AllocationCommand getCancelPrimaryCommand() { return null; } - public void testSearchRouting() { + public void testSearchRouting() throws Exception { var routingTableWatcher = new RoutingTableWatcher(); routingTableWatcher.numReplicas = Math.max(1, routingTableWatcher.numReplicas); @@ -407,6 +486,7 @@ public void testSearchRouting() { getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies; internalCluster().ensureAtLeastNumDataNodes(routingTableWatcher.numReplicas + 1); + installMockTransportVerifications(routingTableWatcher); final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); try { @@ -414,7 +494,7 @@ public void testSearchRouting() { masterClusterService.addListener(routingTableWatcher); createIndex(INDEX_NAME, routingTableWatcher.getIndexSettings()); - // TODO index some documents here once recovery/replication ignore unpromotable shards + indexRandom(randomBoolean(), INDEX_NAME, randomIntBetween(50, 100)); ensureGreen(INDEX_NAME); assertEngineTypes(); @@ -483,6 +563,7 @@ public void testClosedIndex() { var numDataNodes = routingTableWatcher.numReplicas + 2; internalCluster().ensureAtLeastNumDataNodes(numDataNodes); + installMockTransportVerifications(routingTableWatcher); getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies; final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); @@ -501,4 +582,86 @@ public void testClosedIndex() { masterClusterService.removeListener(routingTableWatcher); } } + + public void testRefreshOfUnpromotableShards() throws Exception { + var routingTableWatcher = new RoutingTableWatcher(); + + var numDataNodes = routingTableWatcher.numReplicas + 2; + internalCluster().ensureAtLeastNumDataNodes(numDataNodes); + installMockTransportVerifications(routingTableWatcher); + getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies; + final AtomicInteger refreshUnpromotableActions = new AtomicInteger(0); + + for (var transportService : internalCluster().getInstances(TransportService.class)) { + MockTransportService mockTransportService = (MockTransportService) transportService; + mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.startsWith(TransportUnpromotableShardRefreshAction.NAME)) { + refreshUnpromotableActions.incrementAndGet(); + } + connection.sendRequest(requestId, action, request, options); + }); + } + + final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + try { + // verify the correct number of shard copies of each role as the routing table evolves + masterClusterService.addListener(routingTableWatcher); + + createIndex( + INDEX_NAME, + Settings.builder() + .put(routingTableWatcher.getIndexSettings()) + .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), false) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1) + .build() + ); + ensureGreen(INDEX_NAME); + assertEngineTypes(); + + indexRandom(true, INDEX_NAME, randomIntBetween(1, 10)); + + // Each primary will send a TransportUnpromotableShardRefreshAction to each of the unpromotable replica shards + assertThat( + refreshUnpromotableActions.get(), + is(equalTo((routingTableWatcher.numReplicas - (routingTableWatcher.numIndexingCopies - 1)) * routingTableWatcher.numShards)) + ); + } finally { + masterClusterService.removeListener(routingTableWatcher); + } + } + + public void testNodesWithUnpromotableShardsNeverGetReplicationActions() throws Exception { + var routingTableWatcher = new RoutingTableWatcher(); + var additionalNumberOfNodesWithUnpromotableShards = randomIntBetween(1, 3); + routingTableWatcher.numReplicas = routingTableWatcher.numIndexingCopies + additionalNumberOfNodesWithUnpromotableShards - 1; + internalCluster().ensureAtLeastNumDataNodes(routingTableWatcher.numIndexingCopies + 1); + final List nodesWithUnpromotableOnly = internalCluster().startDataOnlyNodes( + additionalNumberOfNodesWithUnpromotableShards, + Settings.builder().put("node.attr." + TestPlugin.NODE_ATTR_UNPROMOTABLE_ONLY, "true").build() + ); + installMockTransportVerifications(routingTableWatcher); + getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies; + + for (var transportService : internalCluster().getInstances(TransportService.class)) { + MockTransportService mockTransportService = (MockTransportService) transportService; + mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (nodesWithUnpromotableOnly.contains(connection.getNode().getName())) { + assertThat(action, not(containsString("[r]"))); + } + connection.sendRequest(requestId, action, request, options); + }); + } + + final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + try { + // verify the correct number of shard copies of each role as the routing table evolves + masterClusterService.addListener(routingTableWatcher); + createIndex(INDEX_NAME, routingTableWatcher.getIndexSettings()); + ensureGreen(INDEX_NAME); + indexRandom(randomBoolean(), INDEX_NAME, randomIntBetween(50, 100)); + } finally { + masterClusterService.removeListener(routingTableWatcher); + } + } + } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/ReplicaShardRefreshRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/ReplicaShardRefreshRequest.java deleted file mode 100644 index a10d03bf30c10..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/ReplicaShardRefreshRequest.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.action.admin.indices.refresh; - -import org.elasticsearch.TransportVersion; -import org.elasticsearch.action.support.replication.ReplicationRequest; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.core.Nullable; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.tasks.TaskId; - -import java.io.IOException; - -public class ReplicaShardRefreshRequest extends ReplicationRequest { - - @Nullable - private final Long segmentGeneration; - - public ReplicaShardRefreshRequest(ShardId shardId, TaskId parentTaskId, @Nullable Long segmentGeneration) { - super(shardId); - setParentTask(parentTaskId); - this.segmentGeneration = segmentGeneration; - } - - public ReplicaShardRefreshRequest(StreamInput in) throws IOException { - super(in); - if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_7_0)) { - this.segmentGeneration = in.readOptionalVLong(); - } else { - this.segmentGeneration = null; - } - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_7_0)) { - out.writeOptionalVLong(segmentGeneration); - } - } - - @Nullable - public Long getSegmentGeneration() { - return segmentGeneration; - } - - @Override - public String toString() { - return "ReplicaShardRefreshRequest{" + shardId + '}'; - } -} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 53c83a99183d8..c7e7ab9733827 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -9,30 +9,38 @@ package org.elasticsearch.action.admin.indices.refresh; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.replication.BasicReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.function.Predicate; +import java.util.stream.Collectors; public class TransportShardRefreshAction extends TransportReplicationAction< BasicReplicationRequest, - ReplicaShardRefreshRequest, + BasicReplicationRequest, ReplicationResponse> { private static final Logger logger = LogManager.getLogger(TransportShardRefreshAction.class); @@ -41,8 +49,6 @@ public class TransportShardRefreshAction extends TransportReplicationAction< public static final ActionType TYPE = new ActionType<>(NAME, ReplicationResponse::new); public static final String SOURCE_API = "api"; - private final Settings settings; - @Inject public TransportShardRefreshAction( Settings settings, @@ -63,10 +69,10 @@ public TransportShardRefreshAction( shardStateAction, actionFilters, BasicReplicationRequest::new, - ReplicaShardRefreshRequest::new, + BasicReplicationRequest::new, ThreadPool.Names.REFRESH ); - this.settings = settings; + new TransportUnpromotableShardRefreshAction(transportService, actionFilters, indicesService); } @Override @@ -78,31 +84,53 @@ protected ReplicationResponse newResponseInstance(StreamInput in) throws IOExcep protected void shardOperationOnPrimary( BasicReplicationRequest shardRequest, IndexShard primary, - ActionListener> listener + ActionListener> listener ) { - ActionListener.completeWith(listener, () -> { + try (var listeners = new RefCountingListener(listener.map(v -> new PrimaryResult<>(shardRequest, new ReplicationResponse())))) { var refreshResult = primary.refresh(SOURCE_API); logger.trace("{} refresh request executed on primary", primary.shardId()); - var shardRefreshRequest = new ReplicaShardRefreshRequest( - primary.shardId(), - shardRequest.getParentTask(), - refreshResult.generation() - ); - return new PrimaryResult<>(shardRefreshRequest, new ReplicationResponse()); - }); + + // Forward the request to all nodes that hold unpromotable replica shards + final ClusterState clusterState = clusterService.state(); + final Task parentTaskId = taskManager.getTask(shardRequest.getParentTask().getId()); + clusterState.routingTable() + .shardRoutingTable(shardRequest.shardId()) + .assignedShards() + .stream() + .filter(Predicate.not(ShardRouting::isPromotableToPrimary)) + .map(ShardRouting::currentNodeId) + .collect(Collectors.toUnmodifiableSet()) + .forEach(nodeId -> { + final DiscoveryNode node = clusterState.nodes().get(nodeId); + UnpromotableShardRefreshRequest request = new UnpromotableShardRefreshRequest( + primary.shardId(), + refreshResult.generation() + ); + logger.trace("forwarding refresh request [{}] to node [{}]", request, node); + transportService.sendChildRequest( + node, + TransportUnpromotableShardRefreshAction.NAME, + request, + parentTaskId, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>( + listeners.acquire(ignored -> {}), + (in) -> TransportResponse.Empty.INSTANCE, + ThreadPool.Names.REFRESH + ) + ); + }); + } catch (Exception e) { + listener.onFailure(e); + } } @Override - protected void shardOperationOnReplica(ReplicaShardRefreshRequest request, IndexShard replica, ActionListener listener) { - if (DiscoveryNode.isStateless(settings) && replica.routingEntry().isPromotableToPrimary() == false) { - assert request.getSegmentGeneration() != Engine.RefreshResult.UNKNOWN_GENERATION; - replica.waitForSegmentGeneration(request.getSegmentGeneration(), listener.map(l -> new ReplicaResult())); - } else { - ActionListener.completeWith(listener, () -> { - replica.refresh(SOURCE_API); - logger.trace("{} refresh request executed on replica", replica.shardId()); - return new ReplicaResult(); - }); - } + protected void shardOperationOnReplica(BasicReplicationRequest request, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + replica.refresh(SOURCE_API); + logger.trace("{} refresh request executed on replica", replica.shardId()); + return new ReplicaResult(); + }); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java new file mode 100644 index 0000000000000..500a53513a60b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.indices.refresh; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +public class TransportUnpromotableShardRefreshAction extends HandledTransportAction { + public static final String NAME = RefreshAction.NAME + "[u]"; + + private final IndicesService indicesService; + + @Inject + public TransportUnpromotableShardRefreshAction( + TransportService transportService, + ActionFilters actionFilters, + IndicesService indicesService + ) { + super(NAME, transportService, actionFilters, UnpromotableShardRefreshRequest::new, ThreadPool.Names.REFRESH); + this.indicesService = indicesService; + } + + @Override + protected void doExecute(Task task, UnpromotableShardRefreshRequest request, ActionListener responseListener) { + ActionListener.run(responseListener, listener -> { + assert request.getSegmentGeneration() != Engine.RefreshResult.UNKNOWN_GENERATION + : "The request segment is " + request.getSegmentGeneration(); + IndexShard shard = indicesService.indexServiceSafe(request.getShardId().getIndex()).getShard(request.getShardId().id()); + shard.waitForSegmentGeneration(request.getSegmentGeneration(), listener.map(l -> ActionResponse.Empty.INSTANCE)); + }); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/UnpromotableShardRefreshRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/UnpromotableShardRefreshRequest.java new file mode 100644 index 0000000000000..52ef3917ce722 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/UnpromotableShardRefreshRequest.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.indices.refresh; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; + +public class UnpromotableShardRefreshRequest extends ActionRequest { + + private final ShardId shardId; + private final long segmentGeneration; + + public UnpromotableShardRefreshRequest(final ShardId shardId, long segmentGeneration) { + this.shardId = shardId; + this.segmentGeneration = segmentGeneration; + } + + public UnpromotableShardRefreshRequest(StreamInput in) throws IOException { + super(in); + shardId = new ShardId(in); + segmentGeneration = in.readVLong(); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardId.writeTo(out); + out.writeVLong(segmentGeneration); + } + + public ShardId getShardId() { + return shardId; + } + + public long getSegmentGeneration() { + return segmentGeneration; + } + + @Override + public String toString() { + return "UnpromotableShardRefreshRequest{" + "shardId=" + shardId + ", segmentGeneration=" + segmentGeneration + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 8ec274bc410f6..6b1916b4ec843 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -213,6 +213,7 @@ private void performOnReplica( final long maxSeqNoOfUpdatesOrDeletes, final PendingReplicationActions pendingReplicationActions ) { + assert shard.isPromotableToPrimary() : "only promotable shards should receive replication requests"; if (logger.isTraceEnabled()) { logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index a26e36aa39f9b..0c62dce1b2209 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -136,7 +136,9 @@ boolean validate(Metadata metadata) { ); } final Set inSyncAllocationIds = indexMetadata.inSyncAllocationIds(shardRouting.id()); - if (shardRouting.active() && inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false) { + if (shardRouting.active() + && shardRouting.isPromotableToPrimary() + && inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false) { throw new IllegalStateException( "active shard routing " + shardRouting diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 0dd85d873463d..3a5a369caa3f2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -499,15 +499,17 @@ public ShardRouting getByAllocationId(String allocationId) { return null; } - public Set getAllAllocationIds() { + public Set getPromotableAllocationIds() { assert MasterService.assertNotMasterUpdateThread("not using this on the master thread so we don't have to pre-compute this"); Set allAllocationIds = new HashSet<>(); for (ShardRouting shard : shards) { - if (shard.relocating()) { - allAllocationIds.add(shard.getTargetRelocatingShard().allocationId().getId()); - } - if (shard.assignedToNode()) { - allAllocationIds.add(shard.allocationId().getId()); + if (shard.isPromotableToPrimary()) { + if (shard.relocating()) { + allAllocationIds.add(shard.getTargetRelocatingShard().allocationId().getId()); + } + if (shard.assignedToNode()) { + allAllocationIds.add(shard.allocationId().getId()); + } } } return allAllocationIds; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java index 469e7f7efe36c..e0b53e312e400 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java @@ -69,12 +69,14 @@ public void shardStarted(ShardRouting initializingShard, ShardRouting startedSha + "] and startedShard.allocationId [" + startedShard.allocationId().getId() + "] have to have the same"; - Updates updates = changes(startedShard.shardId()); - updates.addedAllocationIds.add(startedShard.allocationId().getId()); - if (startedShard.primary() - // started shard has to have null recoverySource; have to pick up recoverySource from its initializing state - && (initializingShard.recoverySource() == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE)) { - updates.removedAllocationIds.add(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID); + if (startedShard.isPromotableToPrimary()) { + Updates updates = changes(startedShard.shardId()); + updates.addedAllocationIds.add(startedShard.allocationId().getId()); + if (startedShard.primary() + // started shard has to have null recoverySource; have to pick up recoverySource from its initializing state + && (initializingShard.recoverySource() == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE)) { + updates.removedAllocationIds.add(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID); + } } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 86290ca79a65a..12ae735d16b55 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -263,6 +263,7 @@ public synchronized RetentionLeases getRetentionLeases(final boolean expireLease final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis(); final Set leaseIdsForCurrentPeers = routingTable.assignedShards() .stream() + .filter(ShardRouting::isPromotableToPrimary) .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId) .collect(Collectors.toSet()); final boolean allShardsStarted = routingTable.allShardsStarted(); @@ -607,7 +608,7 @@ public synchronized void renewPeerRecoveryRetentionLeases() { boolean renewalNeeded = false; for (int copy = 0; copy < routingTable.size(); copy++) { final ShardRouting shardRouting = routingTable.shard(copy); - if (shardRouting.assignedToNode() == false) { + if (shardRouting.assignedToNode() == false || shardRouting.isPromotableToPrimary() == false) { continue; } final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)); @@ -628,7 +629,7 @@ public synchronized void renewPeerRecoveryRetentionLeases() { if (renewalNeeded) { for (int copy = 0; copy < routingTable.size(); copy++) { final ShardRouting shardRouting = routingTable.shard(copy); - if (shardRouting.assignedToNode()) { + if (shardRouting.assignedToNode() && shardRouting.isPromotableToPrimary()) { final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)); if (retentionLease != null) { final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId()); @@ -874,8 +875,15 @@ private boolean invariant() { assert replicationGroup == null || replicationGroup.equals(calculateReplicationGroup()) : "cached replication group out of sync: expected: " + calculateReplicationGroup() + " but was: " + replicationGroup; + if (replicationGroup != null) { + assert replicationGroup.getReplicationTargets().stream().allMatch(ShardRouting::isPromotableToPrimary) + : "expected all replication target shards of the replication group to be promotable to primary"; + assert replicationGroup.getSkippedShards().stream().allMatch(ShardRouting::isPromotableToPrimary) + : "expected all skipped shards of the replication group to be promotable to primary"; + } + // all assigned shards from the routing table are tracked - assert routingTable == null || checkpoints.keySet().containsAll(routingTable.getAllAllocationIds()) + assert routingTable == null || checkpoints.keySet().containsAll(routingTable.getPromotableAllocationIds()) : "local checkpoints " + checkpoints + " not in-sync with routing table " + routingTable; for (Map.Entry entry : checkpoints.entrySet()) { @@ -895,7 +903,7 @@ private boolean invariant() { if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) { // all tracked shard copies have a corresponding peer-recovery retention lease for (final ShardRouting shardRouting : routingTable.assignedShards()) { - if (checkpoints.get(shardRouting.allocationId().getId()).tracked) { + if (shardRouting.isPromotableToPrimary() && checkpoints.get(shardRouting.allocationId().getId()).tracked) { assert retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) : "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases; assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals( @@ -1151,6 +1159,7 @@ private void addPeerRecoveryRetentionLeaseForSolePrimary() { } else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards() .stream() + .filter(ShardRouting::isPromotableToPrimary) .allMatch( shardRouting -> retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) || checkpoints.get(shardRouting.allocationId().getId()).tracked == false @@ -1185,6 +1194,7 @@ public synchronized void updateFromMaster( // remove entries which don't exist on master Set initializingAllocationIds = routingTable.getAllInitializingShards() .stream() + .filter(ShardRouting::isPromotableToPrimary) .map(ShardRouting::allocationId) .map(AllocationId::getId) .collect(Collectors.toSet()); @@ -1495,7 +1505,10 @@ public synchronized boolean hasAllPeerRecoveryRetentionLeases() { */ public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener listener) { if (hasAllPeerRecoveryRetentionLeases == false) { - final List shardRoutings = routingTable.assignedShards(); + final List shardRoutings = routingTable.assignedShards() + .stream() + .filter(ShardRouting::isPromotableToPrimary) + .toList(); final GroupedActionListener groupedActionListener = new GroupedActionListener<>( shardRoutings.size(), ActionListener.wrap(vs -> { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 280934c73364f..871928a96e4b6 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -55,7 +55,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AsyncIOProcessor; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Booleans; import org.elasticsearch.core.CheckedConsumer; @@ -709,17 +708,26 @@ public void onFailure(Exception e) { if (indexSettings.isSoftDeleteEnabled() && useRetentionLeasesInPeerRecovery == false) { final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases(); - final Set shardRoutings = Sets.newHashSetWithExpectedSize(routingTable.size()); + boolean allShardsUseRetentionLeases = true; for (int copy = 0; copy < routingTable.size(); copy++) { - shardRoutings.add(routingTable.shard(copy)); - } - shardRoutings.addAll(routingTable.assignedShards()); // include relocation targets - if (shardRoutings.stream() - .allMatch( - shr -> shr.assignedToNode() && retentionLeases.contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shr)) - )) { - useRetentionLeasesInPeerRecovery = true; + ShardRouting shardRouting = routingTable.shard(copy); + if (shardRouting.isPromotableToPrimary()) { + if (shardRouting.assignedToNode() == false + || retentionLeases.contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardRouting)) == false) { + allShardsUseRetentionLeases = false; + break; + } + if (this.shardRouting.relocating()) { + ShardRouting shardRoutingReloc = this.shardRouting.getTargetRelocatingShard(); + if (shardRoutingReloc.assignedToNode() == false + || retentionLeases.contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardRoutingReloc)) == false) { + allShardsUseRetentionLeases = false; + break; + } + } + } } + useRetentionLeasesInPeerRecovery = allShardsUseRetentionLeases; } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/ReplicationGroup.java b/server/src/main/java/org/elasticsearch/index/shard/ReplicationGroup.java index cf3b8fc0fbaf3..53f932faf4512 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/ReplicationGroup.java +++ b/server/src/main/java/org/elasticsearch/index/shard/ReplicationGroup.java @@ -40,11 +40,14 @@ public ReplicationGroup( this.trackedAllocationIds = trackedAllocationIds; this.version = version; - this.unavailableInSyncShards = Sets.difference(inSyncAllocationIds, routingTable.getAllAllocationIds()); + this.unavailableInSyncShards = Sets.difference(inSyncAllocationIds, routingTable.getPromotableAllocationIds()); this.replicationTargets = new ArrayList<>(); this.skippedShards = new ArrayList<>(); for (int copy = 0; copy < routingTable.size(); copy++) { ShardRouting shard = routingTable.shard(copy); + if (shard.isPromotableToPrimary() == false) { + continue; + } if (shard.unassigned()) { assert shard.primary() == false : "primary shard should not be unassigned in a replication group: " + shard; skippedShards.add(shard); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 1d3120310f0c9..abd1ef4aaf958 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -216,43 +216,62 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi } final RecoveryTarget recoveryTarget = recoveryRef.target(); assert recoveryTarget.sourceNode() != null : "cannot do a recovery without a source node"; - final RecoveryState.Timer timer = recoveryTarget.state().getTimer(); + final RecoveryState recoveryState = recoveryTarget.state(); + final RecoveryState.Timer timer = recoveryState.getTimer(); + final IndexShard indexShard = recoveryTarget.indexShard(); + + final var failureHandler = ActionListener.notifyOnce(ActionListener.runBefore(ActionListener.noop().delegateResponse((l, e) -> { + // this will be logged as warning later on... + logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e); + onGoingRecoveries.failRecovery( + recoveryId, + new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), + true + ); + }), recoveryRef::close)); - record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, String actionName, TransportRequest requestToSend) {} - final ActionListener toSendListener = ActionListener.notifyOnce( - ActionListener.runBefore(new ActionListener<>() { - @Override - public void onResponse(StartRecoveryRequestToSend r) { - logger.trace( - "{} [{}]: recovery from {}", - r.startRecoveryRequest().shardId(), - r.actionName(), - r.startRecoveryRequest().sourceNode() - ); - transportService.sendRequest( - r.startRecoveryRequest().sourceNode(), - r.actionName(), - r.requestToSend(), - new RecoveryResponseHandler(r.startRecoveryRequest(), timer) - ); - } + if (indexShard.routingEntry().isPromotableToPrimary() == false) { + assert preExistingRequest == null; + assert indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot() == false; + try { + indexShard.preRecovery(failureHandler.map(v -> { + logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); + indexShard.prepareForIndexRecovery(); + // Skip unnecessary intermediate stages + recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); + recoveryState.setStage(RecoveryState.Stage.TRANSLOG); + indexShard.openEngineAndSkipTranslogRecovery(); + recoveryState.getIndex().setFileDetailsComplete(); + recoveryState.setStage(RecoveryState.Stage.FINALIZE); + onGoingRecoveries.markRecoveryAsDone(recoveryId); + return null; + })); + } catch (Exception e) { + failureHandler.onFailure(e); + } - @Override - public void onFailure(Exception e) { - // this will be logged as warning later on... - logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e); - onGoingRecoveries.failRecovery( - recoveryId, - new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), - true - ); - } - }, recoveryRef::close) - ); + return; + } + + record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, String actionName, TransportRequest requestToSend) {} + final ActionListener toSendListener = failureHandler.map(r -> { + logger.trace( + "{} [{}]: recovery from {}", + r.startRecoveryRequest().shardId(), + r.actionName(), + r.startRecoveryRequest().sourceNode() + ); + transportService.sendRequest( + r.startRecoveryRequest().sourceNode(), + r.actionName(), + r.requestToSend(), + new RecoveryResponseHandler(r.startRecoveryRequest(), timer) + ); + return null; + }); if (preExistingRequest == null) { try { - final IndexShard indexShard = recoveryTarget.indexShard(); indexShard.preRecovery(toSendListener.delegateFailure((l, v) -> ActionListener.completeWith(l, () -> { logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); indexShard.prepareForIndexRecovery(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java index d896425eef2cf..59ed1ba2b871f 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java @@ -164,6 +164,7 @@ public RecoveryRef getRecoverySafe(long id, ShardId shardId) { throw new IndexShardClosedException(shardId); } assert recoveryRef.target().shardId().equals(shardId); + assert recoveryRef.target().indexShard().routingEntry().isPromotableToPrimary(); return recoveryRef; } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 8dd0e89e1cbbe..cf096e35bdbc0 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -234,7 +234,7 @@ public void testUnavailableShardsMarkedAsStale() throws Exception { final long primaryTerm = indexMetadata.primaryTerm(0); final Set inSyncAllocationIds = indexMetadata.inSyncAllocationIds(0); - final Set trackedShards = shardRoutingTable.getAllAllocationIds(); + final Set trackedShards = shardRoutingTable.getPromotableAllocationIds(); List unavailableShards = randomSubsetOf(randomIntBetween(1, nbReplicas), shardRoutingTable.replicaShards()); IndexShardRoutingTable.Builder shardRoutingTableBuilder = new IndexShardRoutingTable.Builder(shardRoutingTable); diff --git a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java index 4a3498ea6baae..919737caf2c7a 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java @@ -152,7 +152,7 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception { new ReplicationGroup( shardRoutingTable, clusterService.state().metadata().index(index).inSyncAllocationIds(shardId.id()), - shardRoutingTable.getAllAllocationIds(), + shardRoutingTable.getPromotableAllocationIds(), 0 ) ); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 8dab09fb6015f..543b673635ee0 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -507,7 +507,7 @@ public void testPrimaryFailureHandlingReplicaResponse() throws Exception { final Set inSyncAllocationIds = indexMetadata.inSyncAllocationIds(0); final IndexShardRoutingTable shardRoutingTable = state.routingTable().index(index).shard(shardId.id()); - final Set trackedShards = shardRoutingTable.getAllAllocationIds(); + final Set trackedShards = shardRoutingTable.getPromotableAllocationIds(); final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards, 0); final Thread testThread = Thread.currentThread(); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 97605ec71928f..e64dddff3cdd3 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -928,7 +928,12 @@ public void testSeqNoIsSetOnPrimary() { Set inSyncIds = randomBoolean() ? singleton(routingEntry.allocationId().getId()) : clusterService.state().metadata().index(index).inSyncAllocationIds(0); - ReplicationGroup replicationGroup = new ReplicationGroup(shardRoutingTable, inSyncIds, shardRoutingTable.getAllAllocationIds(), 0); + ReplicationGroup replicationGroup = new ReplicationGroup( + shardRoutingTable, + inSyncIds, + shardRoutingTable.getPromotableAllocationIds(), + 0 + ); when(shard.getReplicationGroup()).thenReturn(replicationGroup); PendingReplicationActions replicationActions = new PendingReplicationActions(shardId, threadPool); replicationActions.accept(replicationGroup); diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java index e4c13a13a16ba..f26caf2f98e5e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java @@ -134,7 +134,7 @@ public void testToXContent() throws IOException { IndexRoutingTable index = clusterState.getRoutingTable().getIndicesRouting().get("index"); String ephemeralId = clusterState.getNodes().get("nodeId1").getEphemeralId(); - String allocationId = index.shard(0).getAllAllocationIds().iterator().next(); + String allocationId = index.shard(0).getPromotableAllocationIds().iterator().next(); XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); @@ -374,7 +374,7 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti IndexRoutingTable index = clusterState.getRoutingTable().getIndicesRouting().get("index"); String ephemeralId = clusterState.getNodes().get("nodeId1").getEphemeralId(); - String allocationId = index.shard(0).getAllAllocationIds().iterator().next(); + String allocationId = index.shard(0).getPromotableAllocationIds().iterator().next(); XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint(); builder.startObject(); @@ -606,7 +606,7 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti IndexRoutingTable index = clusterState.getRoutingTable().getIndicesRouting().get("index"); String ephemeralId = clusterState.getNodes().get("nodeId1").getEphemeralId(); - String allocationId = index.shard(0).getAllAllocationIds().iterator().next(); + String allocationId = index.shard(0).getPromotableAllocationIds().iterator().next(); XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint(); builder.startObject(); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java index fedbc31fcdeb7..c82b13918835e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java @@ -162,7 +162,11 @@ public void testAutoExpandWhenNodeLeavesAndPossiblyRejoins() throws InterruptedE postTable = state.routingTable().index("index").shard(0); assertTrue("not all shards started in " + state.toString(), postTable.allShardsStarted()); - assertThat(postTable.toString(), postTable.getAllAllocationIds(), everyItem(is(in(preTable.getAllAllocationIds())))); + assertThat( + postTable.toString(), + postTable.getPromotableAllocationIds(), + everyItem(is(in(preTable.getPromotableAllocationIds()))) + ); } else { // fake an election where conflicting nodes are removed and readded state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).masterNodeId(null).build()).build(); @@ -199,7 +203,7 @@ public void testAutoExpandWhenNodeLeavesAndPossiblyRejoins() throws InterruptedE .map(shr -> shr.allocationId().getId()) .collect(Collectors.toSet()); - assertThat(postTable.toString(), unchangedAllocationIds, everyItem(is(in(postTable.getAllAllocationIds())))); + assertThat(postTable.toString(), unchangedAllocationIds, everyItem(is(in(postTable.getPromotableAllocationIds())))); RoutingNodesHelper.asStream(postTable).forEach(shardRouting -> { if (shardRouting.assignedToNode() && unchangedAllocationIds.contains(shardRouting.allocationId().getId())) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 277dfeb913525..68b4f18fbcfd2 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -249,7 +249,7 @@ public static FieldType randomIdFieldType() { return randomBoolean() ? ProvidedIdFieldMapper.Defaults.FIELD_TYPE : TsidExtractingIdFieldMapper.FIELD_TYPE; } - public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSupplier) { + public static EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSupplier) { return new EngineConfig( config.getShardId(), config.getThreadPool(),