Skip to content

Commit

Permalink
Unpromotables skip replication and peer recovery (elastic#93210)
Browse files Browse the repository at this point in the history
For skipping replication:
* ReplicationTracker and Group filter shards that are promotable to primary
* Remove unpromotable shards from in sync allocations in metadata
* There is a new Refresh action for unpromotable replica shards

Fixes ES-4861

For skipping peer recovery:
* Unpromotable shards pass directly to STARTED skipping some intermediate peer recovery stages and messages

Fixes ES-5257
  • Loading branch information
kingherc authored and mark-vieira committed Jan 31, 2023
1 parent 0603827 commit 39ba013
Show file tree
Hide file tree
Showing 22 changed files with 464 additions and 160 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/93210.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 93210
summary: Unpromotables skip replication and peer recovery
area: Allocation
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.indices.refresh.TransportUnpromotableShardRefreshAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand;
Expand All @@ -28,6 +31,7 @@
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.NoOpEngine;
import org.elasticsearch.index.shard.IndexShard;
Expand All @@ -39,23 +43,31 @@
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.XContentTestUtils;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;

@SuppressWarnings("resource")
public class ShardRoutingRoleIT extends ESIntegTestCase {
Expand All @@ -65,6 +77,7 @@ public class ShardRoutingRoleIT extends ESIntegTestCase {
public static class TestPlugin extends Plugin implements ClusterPlugin, EnginePlugin {

volatile int numIndexingCopies = 1;
static final String NODE_ATTR_UNPROMOTABLE_ONLY = "unpromotableonly";

@Override
public ShardRoutingRoleStrategy getShardRoutingRoleStrategy() {
Expand Down Expand Up @@ -93,12 +106,55 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n
}
return super.canForceAllocatePrimary(shardRouting, node, allocation);
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
var nodesWithUnpromotableOnly = allocation.getClusterState()
.nodes()
.stream()
.filter(n -> Objects.equals("true", n.getAttributes().get(NODE_ATTR_UNPROMOTABLE_ONLY)))
.map(DiscoveryNode::getName)
.collect(Collectors.toUnmodifiableSet());
if (nodesWithUnpromotableOnly.isEmpty() == false) {
if (nodesWithUnpromotableOnly.contains(node.node().getName())) {
if (shardRouting.isPromotableToPrimary()) {
return allocation.decision(
Decision.NO,
"test",
"shard is promotable to primary so may not be assigned to [" + node.node().getName() + "]"
);
}
} else {
if (shardRouting.isPromotableToPrimary() == false) {
return allocation.decision(
Decision.NO,
"test",
"shard is not promotable to primary so may not be assigned to [" + node.node().getName() + "]"
);
}
}
}
return Decision.YES;
}
});
}

@Override
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
return Optional.of(config -> config.isPromotableToPrimary() ? new InternalEngine(config) : new NoOpEngine(config));
return Optional.of(config -> {
if (config.isPromotableToPrimary()) {
return new InternalEngine(config);
} else {
try {
config.getStore().createEmpty();
} catch (IOException e) {
logger.error("Error creating empty store", e);
throw new RuntimeException(e);
}

return new NoOpEngine(EngineTestCase.copy(config, () -> -1L));
}
});
}
}

Expand All @@ -109,7 +165,7 @@ protected boolean addMockInternalEngine() {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), TestPlugin.class);
return CollectionUtils.concatLists(List.of(MockTransportService.TestPlugin.class, TestPlugin.class), super.nodePlugins());
}

@Override
Expand Down Expand Up @@ -193,11 +249,32 @@ private static void assertRolesInRoutingTableXContent(ClusterState state) {
}
}

public void testShardCreation() {
private static void installMockTransportVerifications(RoutingTableWatcher routingTableWatcher) {
for (var transportService : internalCluster().getInstances(TransportService.class)) {
MockTransportService mockTransportService = (MockTransportService) transportService;
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (routingTableWatcher.numIndexingCopies == 1) {
assertThat("no recovery action should be exchanged", action, not(startsWith("internal:index/shard/recovery/")));
assertThat("no replicated action should be exchanged", action, not(containsString("[r]")));
}
connection.sendRequest(requestId, action, request, options);
});
mockTransportService.addRequestHandlingBehavior(
TransportUnpromotableShardRefreshAction.NAME,
(handler, request, channel, task) -> {
// Skip handling the request and send an immediate empty response
channel.sendResponse(ActionResponse.Empty.INSTANCE);
}
);
}
}

public void testShardCreation() throws Exception {
var routingTableWatcher = new RoutingTableWatcher();

var numDataNodes = routingTableWatcher.numReplicas + 2;
internalCluster().ensureAtLeastNumDataNodes(numDataNodes);
installMockTransportVerifications(routingTableWatcher);
getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies;

final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
Expand Down Expand Up @@ -234,6 +311,7 @@ public void testShardCreation() {

ensureGreen(INDEX_NAME);
assertEngineTypes();
indexRandom(randomBoolean(), INDEX_NAME, randomIntBetween(50, 100));

// removing replicas drops SEARCH_ONLY copies first
while (routingTableWatcher.numReplicas > 0) {
Expand Down Expand Up @@ -341,6 +419,7 @@ public void testPromotion() {

var numDataNodes = routingTableWatcher.numReplicas + 2;
internalCluster().ensureAtLeastNumDataNodes(numDataNodes);
installMockTransportVerifications(routingTableWatcher);
getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies;

final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
Expand Down Expand Up @@ -399,22 +478,23 @@ public AllocationCommand getCancelPrimaryCommand() {
return null;
}

public void testSearchRouting() {
public void testSearchRouting() throws Exception {

var routingTableWatcher = new RoutingTableWatcher();
routingTableWatcher.numReplicas = Math.max(1, routingTableWatcher.numReplicas);
routingTableWatcher.numIndexingCopies = Math.min(routingTableWatcher.numIndexingCopies, routingTableWatcher.numReplicas);
getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies;

internalCluster().ensureAtLeastNumDataNodes(routingTableWatcher.numReplicas + 1);
installMockTransportVerifications(routingTableWatcher);

final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
try {
// verify the correct number of shard copies of each role as the routing table evolves
masterClusterService.addListener(routingTableWatcher);

createIndex(INDEX_NAME, routingTableWatcher.getIndexSettings());
// TODO index some documents here once recovery/replication ignore unpromotable shards
indexRandom(randomBoolean(), INDEX_NAME, randomIntBetween(50, 100));
ensureGreen(INDEX_NAME);
assertEngineTypes();

Expand Down Expand Up @@ -483,6 +563,7 @@ public void testClosedIndex() {

var numDataNodes = routingTableWatcher.numReplicas + 2;
internalCluster().ensureAtLeastNumDataNodes(numDataNodes);
installMockTransportVerifications(routingTableWatcher);
getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies;

final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
Expand All @@ -501,4 +582,86 @@ public void testClosedIndex() {
masterClusterService.removeListener(routingTableWatcher);
}
}

public void testRefreshOfUnpromotableShards() throws Exception {
var routingTableWatcher = new RoutingTableWatcher();

var numDataNodes = routingTableWatcher.numReplicas + 2;
internalCluster().ensureAtLeastNumDataNodes(numDataNodes);
installMockTransportVerifications(routingTableWatcher);
getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies;
final AtomicInteger refreshUnpromotableActions = new AtomicInteger(0);

for (var transportService : internalCluster().getInstances(TransportService.class)) {
MockTransportService mockTransportService = (MockTransportService) transportService;
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.startsWith(TransportUnpromotableShardRefreshAction.NAME)) {
refreshUnpromotableActions.incrementAndGet();
}
connection.sendRequest(requestId, action, request, options);
});
}

final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
try {
// verify the correct number of shard copies of each role as the routing table evolves
masterClusterService.addListener(routingTableWatcher);

createIndex(
INDEX_NAME,
Settings.builder()
.put(routingTableWatcher.getIndexSettings())
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), false)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1)
.build()
);
ensureGreen(INDEX_NAME);
assertEngineTypes();

indexRandom(true, INDEX_NAME, randomIntBetween(1, 10));

// Each primary will send a TransportUnpromotableShardRefreshAction to each of the unpromotable replica shards
assertThat(
refreshUnpromotableActions.get(),
is(equalTo((routingTableWatcher.numReplicas - (routingTableWatcher.numIndexingCopies - 1)) * routingTableWatcher.numShards))
);
} finally {
masterClusterService.removeListener(routingTableWatcher);
}
}

public void testNodesWithUnpromotableShardsNeverGetReplicationActions() throws Exception {
var routingTableWatcher = new RoutingTableWatcher();
var additionalNumberOfNodesWithUnpromotableShards = randomIntBetween(1, 3);
routingTableWatcher.numReplicas = routingTableWatcher.numIndexingCopies + additionalNumberOfNodesWithUnpromotableShards - 1;
internalCluster().ensureAtLeastNumDataNodes(routingTableWatcher.numIndexingCopies + 1);
final List<String> nodesWithUnpromotableOnly = internalCluster().startDataOnlyNodes(
additionalNumberOfNodesWithUnpromotableShards,
Settings.builder().put("node.attr." + TestPlugin.NODE_ATTR_UNPROMOTABLE_ONLY, "true").build()
);
installMockTransportVerifications(routingTableWatcher);
getMasterNodePlugin().numIndexingCopies = routingTableWatcher.numIndexingCopies;

for (var transportService : internalCluster().getInstances(TransportService.class)) {
MockTransportService mockTransportService = (MockTransportService) transportService;
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (nodesWithUnpromotableOnly.contains(connection.getNode().getName())) {
assertThat(action, not(containsString("[r]")));
}
connection.sendRequest(requestId, action, request, options);
});
}

final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
try {
// verify the correct number of shard copies of each role as the routing table evolves
masterClusterService.addListener(routingTableWatcher);
createIndex(INDEX_NAME, routingTableWatcher.getIndexSettings());
ensureGreen(INDEX_NAME);
indexRandom(randomBoolean(), INDEX_NAME, randomIntBetween(50, 100));
} finally {
masterClusterService.removeListener(routingTableWatcher);
}
}

}

This file was deleted.

Loading

0 comments on commit 39ba013

Please sign in to comment.