-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Promote replica on the highest version node #25277
Conversation
This changes the replica selection to prefer to return replicas on the highest version when choosing a replacement to promote when the primary shard fails. Consider this situation: - A replica on a 5.6 node - Another replica on a 6.0 node - The primary on a 6.0 node The primary shard is sending sequence numbers to the replica on the 6.0 node and skipping sending them for the 5.6 node. Now assume that the primary shard fails and (prior to this change) the replica on 5.6 node gets promoted to primary, it now has no knowledge of sequence numbers and the replica on the 6.0 node will be expecting sequence numbers but will never receive them. Relates to elastic#10708
1e7ea04
to
3aa3edc
Compare
@jasontedor could you take a look at this please? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left some comments and suggestions. This also needs to go into 5.6 as a 5.6 master in a mixed 5.6/6.x cluster should be running this code as well.
// calls this method with an out-of-date RoutingNodes, where the version might not | ||
// be accessible. Therefore, we need to protect against the version being null | ||
// (meaning the node will be going away). | ||
Version replicaNodeVersion = nodesToVersions.get(shardRouting.currentNodeId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The nodesToVersions
map is not needed. You can get the version using node(shardRouting.currentNodeId()).node().getVersion()
.
Not having this extra nodesToVersions
map also solves consistency issues where entries are removed from nodesToShards
but not nodesToVersions
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh thanks, I didn't know about that, I removed the map.
if (replicaNodeVersion == null && candidate == null) { | ||
// Only use this replica if there are no other candidates | ||
candidate = shardRouting; | ||
} else if (highestVersionSeen == null || (replicaNodeVersion != null && replicaNodeVersion.after(highestVersionSeen))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method looks like it could enjoy Java 8 lambdas, for example something along the lines of:
return assignedShards(shardId).stream()
.filter(shr -> !shr.primary() && shr.active())
.max(Comparator.comparing(shr -> node(shr.currentNodeId()).node(),
Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion))))
.orElse(null);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mutters something about Java pretending to be a functional language
I don't agree with the word "enjoy" (I think the lambda version is messier than the non-lambda version since there's no Monads in Java) but I did this because you asked for it.
// add a single node | ||
clusterState = ClusterState.builder(clusterState).nodes( | ||
DiscoveryNodes.builder() | ||
.add(newNode("node1-5.x", Version.V_5_6_0))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you generalize the test to use two arbitrary (but distinct) versions? i.e. VersionUtils.randomVersion()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No? Currently the only situation that is valid for a mixed-major-version cluster is 5.6 and 6.0, we don't support mixed clusters of any other versions and 5.6.1 isn't out yet. I'm not sure how randomization would help here, other than triggering some other version-related failures :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this PR does more than just an ordering on 5.6/6.x. It also orders among 6.0 and 6.1 nodes, which is left untested here. Either we restrict the "Promote replica on the highest version node" logic to only order 6.x nodes before 5.6 (and leave 6.0 and 6.1 unordered) or we test that this logic also properly orders 6.0 and 6.1. I agree there is no need to test 5.1 and 6.2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I randomized the versions
ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); | ||
logger.info("--> all shards allocated, replica that should be promoted: {}", startedReplica); | ||
|
||
// fail the primary shard, check replicas get removed as well... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replicas should not be removed? Looks like copy pasta from another test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I fixed this, thanks
@@ -556,4 +558,118 @@ public void testFailAllReplicasInitializingOnPrimaryFailWhileHavingAReplicaToEle | |||
ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard(); | |||
assertThat(newPrimaryShard, not(equalTo(primaryShardToFail))); | |||
} | |||
|
|||
public void testReplicaOnNewestVersionIsPromoted() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test checks one specific scenario. I think that it can be easily generalized in the way of the IndicesClusterStateServiceRandomUpdatesTests
so that it simulates a large range of scenarios.
Essentially it would boil down to creating a few nodes with random version (see randomInitialClusterState
of IndicesClusterStateServiceRandomUpdatesTests
), allocating a few shards to the nodes (see ClusterStateChanges.createIndex
), then failing some of the shards (incl. primary), see ClusterStateChanges.applyFailedShards
or failing some of the nodes (incl. primary), see ClusterStateChanges.deassociateDeadNodes
and then checking that the new primary is on the node with highest version.
@ywelsch I added a test similar to what we talked about, as well as addressing your other feedback, please take another look! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few more comments about the tests.
} | ||
|
||
logger.info("--> starting shards"); | ||
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra semicolon
|
||
logger.info("--> starting shards"); | ||
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));; | ||
state = cluster.reroute(state, new ClusterRerouteRequest()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reroute happens as part of applyStartedShards in the line above
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));; | ||
state = cluster.reroute(state, new ClusterRerouteRequest()); | ||
logger.info("--> starting replicas"); | ||
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no guarantee that all replicas are started (as we have throttling). It's good to test the situation where not all replicas are started though, so maybe we can call applyStartedShards a random number of times.
for (ShardRouting shardRouting : state.getRoutingNodes().shardsWithState(STARTED)) { | ||
if (shardRouting.primary() && randomBoolean()) { | ||
ShardRouting replicaToBePromoted = state.getRoutingNodes() | ||
.activeReplicaWithHighestVersion(shardRouting.shardId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're testing the method activeReplicaWithHighestVersion
here using the method itself? I see no checks here that the primary is indeed on the node with the highest version. I think for the purpose of the test it is sufficient to check that
- if there was at least one active replica while the primary was failed, that a new active primary got assigned
- That the new active primary is on a node with higher or equal version than the replicas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the test to verify candidates using without the activeReplicaWithHighestVersion method
Settings.Builder settingsBuilder = Settings.builder() | ||
.put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)) | ||
.put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1, 3)) | ||
.put("index.routing.allocation.total_shards_per_node", 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this :)
ClusterState previousState = state; | ||
// apply cluster state to nodes (incl. master) | ||
for (DiscoveryNode node : state.nodes()) { | ||
IndicesClusterStateService indicesClusterStateService = clusterStateServiceMap.get(node); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test does not require IndicesClusterStateService, only the ClusterStateChanges class. All the code in this block can go away, it does not add anything to the test.
The test can be put into FailedShardsRoutingTests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it does use the randomInitialClusterState
method, which I'm not sure we want to duplicate, is it worth coupling the tests just to put it in the other location? (edit: I misread and thought two methods were used, only one is)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
randomInitialClusterState is 5 lines. I think we can duplicate that :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I moved it.
List<FailedShard> shardsToFail = new ArrayList<>(); | ||
logger.info("--> found replica that should be promoted: {}", replicaToBePromoted); | ||
logger.info("--> failing shard {}", shardRouting); | ||
shardsToFail.add(new FailedShard(shardRouting, "failed primary", new Exception())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're testing only one failure at a time.
Instead, the test could select a subset of the primary shards at random (and also a few replica shards) and fail them in one go.
.filter(shr -> !shr.primary() && shr.active()) | ||
.filter(shr -> node(shr.currentNodeId()) != null) | ||
.max(Comparator.comparing(shr -> node(shr.currentNodeId()).node(), | ||
Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you readd the comment why we need to consider "null" here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-added this comment
logger.info("--> found replica that should be promoted: {}", replicaToBePromoted); | ||
logger.info("--> failing shard {}", shardRouting); | ||
shardsToFail.add(new FailedShard(shardRouting, "failed primary", new Exception())); | ||
state = cluster.applyFailedShards(state, shardsToFail); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an alternative to explicit shard failing is to remove nodes where the shards are allocated (i.e. when a node disconnects from the cluster).
This would also test the scenario where DiscoveryNode is null in the RoutingNode.
@@ -24,6 +24,7 @@ | |||
import org.apache.logging.log4j.Logger; | |||
import org.apache.lucene.util.CollectionUtil; | |||
import org.elasticsearch.Assertions; | |||
import org.elasticsearch.Version; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused import?
@ywelsch I pushed a few commits addressing your feedback, thanks again for taking a look at this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few more minor comments. Address as you see fit.
DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()) | ||
.add(createNode()).build(); | ||
state = ClusterState.builder(state).nodes(newNodes).build(); | ||
state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment is stale (there are no nodes removed here)
state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave | ||
} | ||
|
||
// Log the shard versions (for debugging if necessary) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log the node versions?
Can also be done directly in the loop where you are adding the nodes :-)
state = cluster.createIndex(state, request); | ||
assertTrue(state.metaData().hasIndex(name)); | ||
} | ||
state = cluster.reroute(state, new ClusterRerouteRequest()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not needed. createIndex automatically reroutes.
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING)); | ||
} | ||
|
||
logger.info("--> state before failing shards: {}", state); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
} | ||
|
||
private static Version getNodeVersion(ShardRouting shardRouting, ClusterState state) { | ||
for (ObjectObjectCursor<String, DiscoveryNode> entry : state.getNodes().getDataNodes()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for iteration here, you can get the node directly by calling state.getNodes().get(shardRouting.currentNodeId())
(which will return null
if no node found)
.filter(s -> currentState.getRoutingNodes().node(s.currentNodeId()) != null) | ||
.collect(Collectors.toSet()); | ||
// If we find a replica and at least another candidate | ||
if (replicaToBePromoted != null && candidates.size() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to determine replicaToBePromoted
. Candidates also does not need to be filtered with !s.equals(replicaToBePromoted)
. It's ok to just check candidates.size() > 0 here to see whether there is going to be a new primary. In that case, we fail the primary + random(0, candidates.size() - 1)
replicas and check afterwards that the new primary is on a node that is at least as high as all replicas.
Thanks @ywelsch, I rewrote the check to be more in line with what we discussed. |
* Promote replica on the highest version node This changes the replica selection to prefer to return replicas on the highest version when choosing a replacement to promote when the primary shard fails. Consider this situation: - A replica on a 5.6 node - Another replica on a 6.0 node - The primary on a 6.0 node The primary shard is sending sequence numbers to the replica on the 6.0 node and skipping sending them for the 5.6 node. Now assume that the primary shard fails and (prior to this change) the replica on 5.6 node gets promoted to primary, it now has no knowledge of sequence numbers and the replica on the 6.0 node will be expecting sequence numbers but will never receive them. Relates to #10708 * Switch from map of node to version to retrieving the version from the node * Remove uneeded null check * You can pretend you're a functional language Java, but you're not fooling me. * Randomize node versions * Add test with random cluster state with multiple versions that fails shards * Re-add comment and remove extra import * Remove unneeded stuff, randomly start replicas a few more times * Move test into FailedNodeRoutingTests * Make assertions actually test replica version promotion * Rewrite test, taking Yannick's feedback into account
* master: (129 commits) Add doc note regarding explicit publish host Fix typo in name of test Add additional test for sequence-number recovery WrapperQueryBuilder should also rewrite the parsed query. Remove dead code and stale Javadoc Update defaults in documentation (#25483) [DOCS] Add docs-dir to Painless (#25482) Add concurrent deprecation logger test [DOCS] Update shared attributes for Elasticsearch (#25479) Use LRU set to reduce repeat deprecation messages Add NioTransport threads to thread name checks (#25477) Add shortcut for AbstractQueryBuilder.parseInnerQueryBuilder to QueryShardContext Prevent channel enqueue after selector close (#25478) Fix Java 9 compilation issue Remove unregistered `transport.netty.*` settings (#25476) Handle ping correctly in NioTransport (#25462) Tests: Remove platform specific assertion in NioSocketChannelTests Remove QueryParseContext from parsing QueryBuilders (#25448) Promote replica on the highest version node (#25277) test: added not null assertion ...
This changes the replica selection to prefer to return replicas on the highest
version when choosing a replacement to promote when the primary shard fails.
Consider this situation:
The primary shard is sending sequence numbers to the replica on the 6.0 node and
skipping sending them for the 5.6 node. Now assume that the primary shard fails
and (prior to this change) the replica on 5.6 node gets promoted to primary, it
now has no knowledge of sequence numbers and the replica on the 6.0 node will be
expecting sequence numbers but will never receive them.
Relates to #10708