Skip to content

Commit

Permalink
Controlling discovery for decommissioned nodes (#4590)
Browse files Browse the repository at this point in the history
* Controlling discovery for decommissioned nodes

Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN authored Oct 7, 2022
1 parent 109319e commit 2e4b27b
Show file tree
Hide file tree
Showing 12 changed files with 329 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Add APIs (GET/PUT) to decommission awareness attribute ([#4261](https://github.com/opensearch-project/OpenSearch/pull/4261))
- Improve Gradle pre-commit checks to pre-empt Jenkins build ([#4660](https://github.com/opensearch-project/OpenSearch/pull/4660))
- Update to Apache Lucene 9.4.0 ([#4661](https://github.com/opensearch-project/OpenSearch/pull/4661))
- Controlling discovery for decommissioned nodes ([#4590](https://github.com/opensearch-project/OpenSearch/pull/4590))
- Backport Apache Lucene version change for 2.4.0 ([#4677](https://github.com/opensearch-project/OpenSearch/pull/4677))

### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import java.util.stream.StreamSupport;

import static org.opensearch.cluster.coordination.NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_ID;
import static org.opensearch.cluster.decommission.DecommissionService.nodeCommissioned;
import static org.opensearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered;
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY;
Expand Down Expand Up @@ -138,6 +139,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery

private final Settings settings;
private final boolean singleNodeDiscovery;
private volatile boolean localNodeCommissioned;
private final ElectionStrategy electionStrategy;
private final TransportService transportService;
private final ClusterManagerService clusterManagerService;
Expand Down Expand Up @@ -218,7 +220,8 @@ public Coordinator(
this::joinLeaderInTerm,
this.onJoinValidators,
rerouteService,
nodeHealthService
nodeHealthService,
this::onNodeCommissionStatusChange
);
this.persistedStateSupplier = persistedStateSupplier;
this.noClusterManagerBlockService = new NoClusterManagerBlockService(settings, clusterSettings);
Expand Down Expand Up @@ -281,6 +284,7 @@ public Coordinator(
joinHelper::logLastFailedJoinAttempt
);
this.nodeHealthService = nodeHealthService;
this.localNodeCommissioned = true;
}

private ClusterFormationState getClusterFormationState() {
Expand Down Expand Up @@ -596,6 +600,9 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback
joinRequest.getSourceNode().getVersion(),
stateForJoinValidation.getNodes().getMinNodeVersion()
);
// we are checking source node commission status here to reject any join request coming from a decommissioned node
// even before executing the join task to fail fast
JoinTaskExecutor.ensureNodeCommissioned(joinRequest.getSourceNode(), stateForJoinValidation.metadata());
}
sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback);
} else {
Expand Down Expand Up @@ -1424,6 +1431,17 @@ protected void onFoundPeersUpdated() {
}
}

// package-visible for testing
synchronized void onNodeCommissionStatusChange(boolean localNodeCommissioned) {
this.localNodeCommissioned = localNodeCommissioned;
peerFinder.onNodeCommissionStatusChange(localNodeCommissioned);
}

// package-visible for testing
boolean localNodeCommissioned() {
return localNodeCommissioned;
}

private void startElectionScheduler() {
assert electionScheduler == null : electionScheduler;

Expand All @@ -1450,6 +1468,14 @@ public void run() {
return;
}

// if either the localNodeCommissioned flag or the last accepted state thinks it should skip pre voting, we will
// acknowledge it
if (nodeCommissioned(lastAcceptedState.nodes().getLocalNode(), lastAcceptedState.metadata()) == false
|| localNodeCommissioned == false) {
logger.debug("skip prevoting as local node is decommissioned");
return;
}

if (prevotingRound != null) {
prevotingRound.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.cluster.ClusterStateTaskListener;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.coordination.Coordinator.Mode;
import org.opensearch.cluster.decommission.NodeDecommissionedException;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RerouteService;
Expand All @@ -57,6 +58,7 @@
import org.opensearch.monitor.StatusInfo;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportRequest;
Expand All @@ -78,6 +80,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -118,6 +121,7 @@ public class JoinHelper {
private final AtomicReference<FailedJoinAttempt> lastFailedJoinAttempt = new AtomicReference<>();

private final Supplier<JoinTaskExecutor> joinTaskExecutorGenerator;
private final Consumer<Boolean> nodeCommissioned;

JoinHelper(
Settings settings,
Expand All @@ -130,12 +134,14 @@ public class JoinHelper {
Function<StartJoinRequest, Join> joinLeaderInTerm,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators,
RerouteService rerouteService,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
Consumer<Boolean> nodeCommissioned
) {
this.clusterManagerService = clusterManagerService;
this.transportService = transportService;
this.nodeHealthService = nodeHealthService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
this.nodeCommissioned = nodeCommissioned;
this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService, transportService) {

private final long term = currentTermSupplier.getAsLong();
Expand Down Expand Up @@ -342,6 +348,7 @@ public void handleResponse(Empty response) {
pendingOutgoingJoins.remove(dedupKey);
logger.debug("successfully joined {} with {}", destination, joinRequest);
lastFailedJoinAttempt.set(null);
nodeCommissioned.accept(true);
onCompletion.run();
}

Expand All @@ -352,6 +359,13 @@ public void handleException(TransportException exp) {
FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp);
attempt.logNow();
lastFailedJoinAttempt.set(attempt);
if (exp instanceof RemoteTransportException && (exp.getCause() instanceof NodeDecommissionedException)) {
logger.info(
"local node is decommissioned [{}]. Will not be able to join the cluster",
exp.getCause().getMessage()
);
nodeCommissioned.accept(false);
}
onCompletion.run();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@
import org.opensearch.cluster.ClusterStateTaskExecutor;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.cluster.decommission.DecommissionAttributeMetadata;
import org.opensearch.cluster.decommission.DecommissionStatus;
import org.opensearch.cluster.decommission.NodeDecommissionedException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
Expand All @@ -64,6 +61,7 @@
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static org.opensearch.cluster.decommission.DecommissionService.nodeCommissioned;
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;

/**
Expand Down Expand Up @@ -196,14 +194,17 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
// we have to reject nodes that don't support all indices we have in this cluster
ensureIndexCompatibility(node.getVersion(), currentState.getMetadata());
// we have added the same check in handleJoinRequest method and adding it here as this method
// would guarantee that a decommissioned node would never be able to join the cluster and ensures correctness
ensureNodeCommissioned(node, currentState.metadata());
nodesBuilder.add(node);
nodesChanged = true;
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
if (node.isClusterManagerNode()) {
joiniedNodeNameIds.put(node.getName(), node.getId());
}
} catch (IllegalArgumentException | IllegalStateException e) {
} catch (IllegalArgumentException | IllegalStateException | NodeDecommissionedException e) {
results.failure(joinTask, e);
continue;
}
Expand Down Expand Up @@ -477,22 +478,13 @@ public static void ensureMajorVersionBarrier(Version joiningNodeVersion, Version
}

public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata) {
DecommissionAttributeMetadata decommissionAttributeMetadata = metadata.decommissionAttributeMetadata();
if (decommissionAttributeMetadata != null) {
DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute();
DecommissionStatus status = decommissionAttributeMetadata.status();
if (decommissionAttribute != null && status != null) {
// We will let the node join the cluster if the current status is in FAILED state
if (node.getAttributes().get(decommissionAttribute.attributeName()).equals(decommissionAttribute.attributeValue())
&& (status.equals(DecommissionStatus.IN_PROGRESS) || status.equals(DecommissionStatus.SUCCESSFUL))) {
throw new NodeDecommissionedException(
"node [{}] has decommissioned attribute [{}] with current status of decommissioning [{}]",
node.toString(),
decommissionAttribute.toString(),
status.status()
);
}
}
if (nodeCommissioned(node, metadata) == false) {
throw new NodeDecommissionedException(
"node [{}] has decommissioned attribute [{}] with current status of decommissioning [{}]",
node.toString(),
metadata.decommissionAttributeMetadata().decommissionAttribute().toString(),
metadata.decommissionAttributeMetadata().status().status()
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,6 @@ private Set<DiscoveryNode> filterNodesWithDecommissionAttribute(
return nodesWithDecommissionAttribute;
}

private static boolean nodeHasDecommissionedAttribute(DiscoveryNode discoveryNode, DecommissionAttribute decommissionAttribute) {
return discoveryNode.getAttributes().get(decommissionAttribute.attributeName()).equals(decommissionAttribute.attributeValue());
}

private static void validateAwarenessAttribute(
final DecommissionAttribute decommissionAttribute,
List<String> awarenessAttributes,
Expand Down Expand Up @@ -531,4 +527,38 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
});
}

/**
* Utility method to check if the node has decommissioned attribute
*
* @param discoveryNode node to check on
* @param decommissionAttribute attribute to be checked with
* @return true or false based on whether node has decommissioned attribute
*/
public static boolean nodeHasDecommissionedAttribute(DiscoveryNode discoveryNode, DecommissionAttribute decommissionAttribute) {
String nodeAttributeValue = discoveryNode.getAttributes().get(decommissionAttribute.attributeName());
return nodeAttributeValue != null && nodeAttributeValue.equals(decommissionAttribute.attributeValue());
}

/**
* Utility method to check if the node is commissioned or not
*
* @param discoveryNode node to check on
* @param metadata metadata present current which will be used to check the commissioning status of the node
* @return if the node is commissioned or not
*/
public static boolean nodeCommissioned(DiscoveryNode discoveryNode, Metadata metadata) {
DecommissionAttributeMetadata decommissionAttributeMetadata = metadata.decommissionAttributeMetadata();
if (decommissionAttributeMetadata != null) {
DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute();
DecommissionStatus status = decommissionAttributeMetadata.status();
if (decommissionAttribute != null && status != null) {
if (nodeHasDecommissionedAttribute(discoveryNode, decommissionAttribute)
&& (status.equals(DecommissionStatus.IN_PROGRESS) || status.equals(DecommissionStatus.SUCCESSFUL))) {
return false;
}
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ public void apply(Settings value, Settings current, Settings previous) {
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING,
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING,
PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING,
PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING,
ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING,
ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING,
Expand Down
29 changes: 28 additions & 1 deletion server/src/main/java/org/opensearch/discovery/PeerFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,23 @@ public abstract class PeerFinder {
Setting.Property.NodeScope
);

// the time between attempts to find all peers when node is in decommissioned state, default set to 2 minutes
public static final Setting<TimeValue> DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING = Setting.timeSetting(
"discovery.find_peers_interval_during_decommission",
TimeValue.timeValueSeconds(120L),
TimeValue.timeValueMillis(1000),
Setting.Property.NodeScope
);

public static final Setting<TimeValue> DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING = Setting.timeSetting(
"discovery.request_peers_timeout",
TimeValue.timeValueMillis(3000),
TimeValue.timeValueMillis(1),
Setting.Property.NodeScope
);

private final TimeValue findPeersInterval;
private final Settings settings;
private TimeValue findPeersInterval;
private final TimeValue requestPeersTimeout;

private final Object mutex = new Object();
Expand All @@ -112,6 +121,7 @@ public PeerFinder(
TransportAddressConnector transportAddressConnector,
ConfiguredHostsResolver configuredHostsResolver
) {
this.settings = settings;
findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings);
this.transportService = transportService;
Expand All @@ -128,6 +138,23 @@ public PeerFinder(
);
}

public synchronized void onNodeCommissionStatusChange(boolean localNodeCommissioned) {
findPeersInterval = localNodeCommissioned
? DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings)
: DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING.get(settings);
logger.info(
"setting findPeersInterval to [{}] as node commission status = [{}] for local node [{}]",
findPeersInterval,
localNodeCommissioned,
transportService.getLocalNode()
);
}

// package private for tests
TimeValue getFindPeersInterval() {
return findPeersInterval;
}

public void activate(final DiscoveryNodes lastAcceptedNodes) {
logger.trace("activating with {}", lastAcceptedNodes);

Expand Down
Loading

0 comments on commit 2e4b27b

Please sign in to comment.