Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto-expand replicas only after failing nodes #30553

Merged
merged 4 commits into from
May 14, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,24 @@ public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRout
}

protected ClusterState buildResultAndLogHealthChange(ClusterState oldState, RoutingAllocation allocation, String reason) {
RoutingTable oldRoutingTable = oldState.routingTable();
RoutingNodes newRoutingNodes = allocation.routingNodes();
ClusterState newState = buildResult(oldState, allocation);

logClusterHealthStateChange(
new ClusterStateHealth(oldState),
new ClusterStateHealth(newState),
reason
);

return newState;
}

private ClusterState buildResult(ClusterState oldState, RoutingAllocation allocation) {
final RoutingTable oldRoutingTable = oldState.routingTable();
final RoutingNodes newRoutingNodes = allocation.routingNodes();
final RoutingTable newRoutingTable = new RoutingTable.Builder().updateNodes(oldRoutingTable.version(), newRoutingNodes).build();
MetaData newMetaData = allocation.updateMetaDataWithRoutingChanges(newRoutingTable);
final MetaData newMetaData = allocation.updateMetaDataWithRoutingChanges(newRoutingTable);
assert newRoutingTable.validate(newMetaData); // validates the routing table is coherent with the cluster state metadata

final ClusterState.Builder newStateBuilder = ClusterState.builder(oldState)
.routingTable(newRoutingTable)
.metaData(newMetaData);
Expand All @@ -131,13 +144,7 @@ protected ClusterState buildResultAndLogHealthChange(ClusterState oldState, Rout
newStateBuilder.customs(customsBuilder.build());
}
}
final ClusterState newState = newStateBuilder.build();
logClusterHealthStateChange(
new ClusterStateHealth(oldState),
new ClusterStateHealth(newState),
reason
);
return newState;
return newStateBuilder.build();
}

// Used for testing
Expand Down Expand Up @@ -209,24 +216,23 @@ public ClusterState applyFailedShards(final ClusterState clusterState, final Lis
* if needed.
*/
public ClusterState deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason) {
ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);
RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
clusterInfoService.getClusterInfo(), currentNanoTime());

// first, clear from the shards any node id they used to belong to that is now dead
deassociateDeadNodes(allocation);

if (reroute) {
reroute(allocation);
if (allocation.routingNodesChanged()) {
clusterState = buildResult(clusterState, allocation);
}

if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {
if (reroute) {
return reroute(clusterState, reason);
} else {
return clusterState;
}
return buildResultAndLogHealthChange(clusterState, allocation, reason);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
/**
* a task indicated that the current node should become master, if no current master is known
*/
private static final DiscoveryNode BECOME_MASTER_TASK = new DiscoveryNode("_BECOME_MASTER_TASK_",
public static final DiscoveryNode BECOME_MASTER_TASK = new DiscoveryNode("_BECOME_MASTER_TASK_",
new TransportAddress(TransportAddress.META_ADDRESS, 0),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) {
@Override
Expand All @@ -393,7 +393,7 @@ public String toString() {
* a task that is used to signal the election is stopped and we should process pending joins.
* it may be use in combination with {@link #BECOME_MASTER_TASK}
*/
private static final DiscoveryNode FINISH_ELECTION_TASK = new DiscoveryNode("_FINISH_ELECTION_",
public static final DiscoveryNode FINISH_ELECTION_TASK = new DiscoveryNode("_FINISH_ELECTION_",
new TransportAddress(TransportAddress.META_ADDRESS, 0), Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) {
@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,36 @@
*/
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.indices.cluster.ClusterStateChanges;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.isIn;

public class AutoExpandReplicasTests extends ESTestCase {

Expand Down Expand Up @@ -72,4 +100,104 @@ public void testInvalidValues() {
}

}

private static final AtomicInteger nodeIdGenerator = new AtomicInteger();

protected DiscoveryNode createNode(DiscoveryNode.Role... mustHaveRoles) {
Set<DiscoveryNode.Role> roles = new HashSet<>(randomSubsetOf(Sets.newHashSet(DiscoveryNode.Role.values())));
for (DiscoveryNode.Role mustHaveRole : mustHaveRoles) {
roles.add(mustHaveRole);
}
final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet());
return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles,
Version.CURRENT);
}

/**
* Checks that when nodes leave the cluster that the auto-expand-replica functionality only triggers after failing the shards on
* the removed nodes. This ensures that active shards on other live nodes are not failed if the primary resided on a now dead node.
* Instead, one of the replicas on the live nodes first gets promoted to primary, and the auto-expansion (removing replicas) only
* triggers in a follow-up step.
*/
public void testAutoExpandWhenNodeLeavesAndPossiblyRejoins() throws InterruptedException {
final ThreadPool threadPool = new TestThreadPool(getClass().getName());
final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool);

try {
List<DiscoveryNode> allNodes = new ArrayList<>();
DiscoveryNode localNode = createNode(DiscoveryNode.Role.MASTER); // local node is the master
allNodes.add(localNode);
int numDataNodes = randomIntBetween(3, 5);
List<DiscoveryNode> dataNodes = new ArrayList<>(numDataNodes);
for (int i = 0; i < numDataNodes; i++) {
dataNodes.add(createNode(DiscoveryNode.Role.DATA));
}
allNodes.addAll(dataNodes);
ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[allNodes.size()]));

CreateIndexRequest request = new CreateIndexRequest("index",
Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_AUTO_EXPAND_REPLICAS, "0-all").build())
.waitForActiveShards(ActiveShardCount.NONE);
state = cluster.createIndex(state, request);
assertTrue(state.metaData().hasIndex("index"));
while (state.routingTable().index("index").shard(0).allShardsStarted() == false) {
logger.info(state);
state = cluster.applyStartedShards(state,
state.routingTable().index("index").shard(0).shardsWithState(ShardRoutingState.INITIALIZING));
state = cluster.reroute(state, new ClusterRerouteRequest());
}

IndexShardRoutingTable preTable = state.routingTable().index("index").shard(0);
final Set<String> unchangedNodeIds;
final IndexShardRoutingTable postTable;

if (randomBoolean()) {
// simulate node removal
List<DiscoveryNode> nodesToRemove = randomSubsetOf(2, dataNodes);
unchangedNodeIds = dataNodes.stream().filter(n -> nodesToRemove.contains(n) == false)
.map(DiscoveryNode::getId).collect(Collectors.toSet());

state = cluster.removeNodes(state, nodesToRemove);
postTable = state.routingTable().index("index").shard(0);

assertTrue("not all shards started in " + state.toString(), postTable.allShardsStarted());
assertThat(postTable.toString(), postTable.getAllAllocationIds(), everyItem(isIn(preTable.getAllAllocationIds())));
} else {
// fake an election where conflicting nodes are removed and readded
state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).masterNodeId(null).build()).build();

List<DiscoveryNode> conflictingNodes = randomSubsetOf(2, dataNodes);
unchangedNodeIds = dataNodes.stream().filter(n -> conflictingNodes.contains(n) == false)
.map(DiscoveryNode::getId).collect(Collectors.toSet());

List<DiscoveryNode> nodesToAdd = conflictingNodes.stream()
.map(n -> new DiscoveryNode(n.getName(), n.getId(), buildNewFakeTransportAddress(), n.getAttributes(), n.getRoles(), n.getVersion()))
.collect(Collectors.toList());

if (randomBoolean()) {
nodesToAdd.add(createNode(DiscoveryNode.Role.DATA));
}

state = cluster.joinNodesAndBecomeMaster(state, nodesToAdd);
postTable = state.routingTable().index("index").shard(0);
}

Set<String> unchangedAllocationIds = preTable.getShards().stream().filter(shr -> unchangedNodeIds.contains(shr.currentNodeId()))
.map(shr -> shr.allocationId().getId()).collect(Collectors.toSet());

assertThat(postTable.toString(), unchangedAllocationIds, everyItem(isIn(postTable.getAllAllocationIds())));

postTable.getShards().forEach(
shardRouting -> {
if (shardRouting.assignedToNode() && unchangedAllocationIds.contains(shardRouting.allocationId().getId())) {
assertTrue("Shard should be active: " + shardRouting, shardRouting.active());
}
}
);
} finally {
terminate(threadPool);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -232,6 +233,15 @@ public ClusterState addNodes(ClusterState clusterState, List<DiscoveryNode> node
return runTasks(joinTaskExecutor, clusterState, nodes);
}

public ClusterState joinNodesAndBecomeMaster(ClusterState clusterState, List<DiscoveryNode> nodes) {
List<DiscoveryNode> joinNodes = new ArrayList<>();
joinNodes.add(NodeJoinController.BECOME_MASTER_TASK);
joinNodes.add(NodeJoinController.FINISH_ELECTION_TASK);
joinNodes.addAll(nodes);

return runTasks(joinTaskExecutor, clusterState, joinNodes);
}

public ClusterState removeNodes(ClusterState clusterState, List<DiscoveryNode> nodes) {
return runTasks(nodeRemovalExecutor, clusterState, nodes.stream()
.map(n -> new ZenDiscovery.NodeRemovalClusterStateTaskExecutor.Task(n, "dummy reason")).collect(Collectors.toList()));
Expand Down