From e77e26028bb1700dd49eb8553c276dbf5fafe045 Mon Sep 17 00:00:00 2001 From: pranikum <109206473+pranikum@users.noreply.github.com> Date: Sun, 25 Sep 2022 13:55:11 +0530 Subject: [PATCH] Merge to latest Signed-off-by: pranikum <109206473+pranikum@users.noreply.github.com> --- .../decommission/DecommissionAttribute.java | 2 +- .../DecommissionAttributeMetadata.java | 48 +-- .../decommission/DecommissionController.java | 332 +++++------------- .../decommission/DecommissionService.java | 252 ++++++------- .../decommission/DecommissionStatus.java | 2 +- .../DecommissioningFailedException.java | 2 +- .../DecommissionControllerTests.java | 160 ++++----- 7 files changed, 324 insertions(+), 474 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttribute.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttribute.java index c1a444c29d170..bf2487a1a0e18 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttribute.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttribute.java @@ -89,4 +89,4 @@ public int hashCode() { public String toString() { return "DecommissionAttribute{" + "attributeName='" + attributeName + '\'' + ", attributeValue='" + attributeValue + '\'' + '}'; } -} \ No newline at end of file +} diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java index 464495f7b95b8..dbb3fea823eb6 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java @@ -83,6 +83,10 @@ public DecommissionStatus status() { */ // synchronized is strictly speaking not needed (this is called by a single thread), but just to be safe public synchronized DecommissionAttributeMetadata setUpdatedStatus(DecommissionStatus newStatus) { + // if the current status is the expected status already, we return the same instance + if (newStatus.equals(status)) { + return this; + } // We don't expect that INIT will be new status, as it is registered only when starting the decommission action switch (newStatus) { case IN_PROGRESS: @@ -97,23 +101,23 @@ public synchronized DecommissionAttributeMetadata setUpdatedStatus(DecommissionS break; default: throw new IllegalArgumentException( - "illegal decommission status [" + newStatus.status() + "] requested for updating metadata" + "illegal decommission status [" + newStatus.status() + "] requested for updating metadata" ); } return this; } - protected void validateAndSetStatus(DecommissionStatus expected, DecommissionStatus next) { + private void validateAndSetStatus(DecommissionStatus expected, DecommissionStatus next) { if (status.equals(expected) == false) { assert false : "can't move decommission status to [" - + next - + "]. current status: [" - + status - + "] (expected [" - + expected - + "])"; + + next + + "]. current status: [" + + status + + "] (expected [" + + expected + + "])"; throw new IllegalStateException( - "can't move decommission status to [" + next + "]. current status: [" + status + "] (expected [" + expected + "])" + "can't move decommission status to [" + next + "]. current status: [" + status + "] (expected [" + expected + "])" ); } status = next; @@ -176,8 +180,8 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser) if (attributeType.equals(currentFieldName)) { if (parser.nextToken() != XContentParser.Token.START_OBJECT) { throw new OpenSearchParseException( - "failed to parse decommission attribute type [{}], expected object", - attributeType + "failed to parse decommission attribute type [{}], expected object", + attributeType ); } token = parser.nextToken(); @@ -190,8 +194,8 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser) value = parser.text(); } else { throw new OpenSearchParseException( - "failed to parse attribute [{}], expected string for attribute value", - fieldName + "failed to parse attribute [{}], expected string for attribute value", + fieldName ); } decommissionAttribute = new DecommissionAttribute(fieldName, value); @@ -205,14 +209,14 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser) } else if ("status".equals(currentFieldName)) { if (parser.nextToken() != XContentParser.Token.VALUE_STRING) { throw new OpenSearchParseException( - "failed to parse status of decommissioning, expected string but found unknown type" + "failed to parse status of decommissioning, expected string but found unknown type" ); } status = DecommissionStatus.fromString(parser.text()); } else { throw new OpenSearchParseException( - "unknown field found [{}], failed to parse the decommission attribute", - currentFieldName + "unknown field found [{}], failed to parse the decommission attribute", + currentFieldName ); } } @@ -242,11 +246,11 @@ public EnumSet context() { * @param params serialization parameters */ public static void toXContent( - DecommissionAttribute decommissionAttribute, - DecommissionStatus status, - String attributeType, - XContentBuilder builder, - ToXContent.Params params + DecommissionAttribute decommissionAttribute, + DecommissionStatus status, + String attributeType, + XContentBuilder builder, + ToXContent.Params params ) throws IOException { builder.startObject(attributeType); builder.field(decommissionAttribute.attributeName(), decommissionAttribute.attributeValue()); @@ -258,4 +262,4 @@ public static void toXContent( public String toString() { return Strings.toString(this); } -} \ No newline at end of file +} diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index f37c7229c8dcf..7719012f2f3d7 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -18,13 +18,6 @@ import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse; -import org.opensearch.action.admin.cluster.node.stats.NodeStats; -import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction; -import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; -import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; -import org.opensearch.action.admin.cluster.shards.routing.wrr.put.ClusterPutWRRWeightsAction; -import org.opensearch.action.admin.cluster.shards.routing.wrr.put.ClusterPutWRRWeightsRequest; -import org.opensearch.action.admin.cluster.shards.routing.wrr.put.ClusterPutWRRWeightsResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.ClusterStateTaskConfig; @@ -39,7 +32,6 @@ import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.unit.TimeValue; -import org.opensearch.http.HttpStats; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportException; import org.opensearch.transport.TransportResponseHandler; @@ -47,9 +39,7 @@ import java.io.IOException; import java.util.Arrays; -import java.util.HashMap; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Predicate; @@ -71,10 +61,10 @@ public class DecommissionController { private final ThreadPool threadPool; DecommissionController( - ClusterService clusterService, - TransportService transportService, - AllocationService allocationService, - ThreadPool threadPool + ClusterService clusterService, + TransportService transportService, + AllocationService allocationService, + ThreadPool threadPool ) { this.clusterService = clusterService; this.transportService = transportService; @@ -90,36 +80,36 @@ public class DecommissionController { */ public void excludeDecommissionedNodesFromVotingConfig(Set nodes, ActionListener listener) { transportService.sendRequest( - transportService.getLocalNode(), - AddVotingConfigExclusionsAction.NAME, - new AddVotingConfigExclusionsRequest( - Strings.EMPTY_ARRAY, - nodes.toArray(String[]::new), - Strings.EMPTY_ARRAY, - TimeValue.timeValueSeconds(120) // giving a larger timeout of 120 sec as cluster might already be in stress when - // decommission is triggered - ), - new TransportResponseHandler() { - @Override - public void handleResponse(AddVotingConfigExclusionsResponse response) { - listener.onResponse(null); - } + transportService.getLocalNode(), + AddVotingConfigExclusionsAction.NAME, + new AddVotingConfigExclusionsRequest( + Strings.EMPTY_ARRAY, + nodes.toArray(String[]::new), + Strings.EMPTY_ARRAY, + TimeValue.timeValueSeconds(120) // giving a larger timeout of 120 sec as cluster might already be in stress when + // decommission is triggered + ), + new TransportResponseHandler() { + @Override + public void handleResponse(AddVotingConfigExclusionsResponse response) { + listener.onResponse(null); + } - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } + @Override + public void handleException(TransportException exp) { + listener.onFailure(exp); + } - @Override - public String executor() { - return ThreadPool.Names.SAME; - } + @Override + public String executor() { + return ThreadPool.Names.SAME; + } - @Override - public AddVotingConfigExclusionsResponse read(StreamInput in) throws IOException { - return new AddVotingConfigExclusionsResponse(in); - } + @Override + public AddVotingConfigExclusionsResponse read(StreamInput in) throws IOException { + return new AddVotingConfigExclusionsResponse(in); } + } ); } @@ -132,30 +122,30 @@ public void clearVotingConfigExclusion(ActionListener listener, boolean wa final ClearVotingConfigExclusionsRequest clearVotingConfigExclusionsRequest = new ClearVotingConfigExclusionsRequest(); clearVotingConfigExclusionsRequest.setWaitForRemoval(waitForRemoval); transportService.sendRequest( - transportService.getLocalNode(), - ClearVotingConfigExclusionsAction.NAME, - clearVotingConfigExclusionsRequest, - new TransportResponseHandler() { - @Override - public void handleResponse(ClearVotingConfigExclusionsResponse response) { - listener.onResponse(null); - } + transportService.getLocalNode(), + ClearVotingConfigExclusionsAction.NAME, + clearVotingConfigExclusionsRequest, + new TransportResponseHandler() { + @Override + public void handleResponse(ClearVotingConfigExclusionsResponse response) { + listener.onResponse(null); + } - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } + @Override + public void handleException(TransportException exp) { + listener.onFailure(exp); + } - @Override - public String executor() { - return ThreadPool.Names.SAME; - } + @Override + public String executor() { + return ThreadPool.Names.SAME; + } - @Override - public ClearVotingConfigExclusionsResponse read(StreamInput in) throws IOException { - return new ClearVotingConfigExclusionsResponse(in); - } + @Override + public ClearVotingConfigExclusionsResponse read(StreamInput in) throws IOException { + return new ClearVotingConfigExclusionsResponse(in); } + } ); } @@ -170,28 +160,30 @@ public ClearVotingConfigExclusionsResponse read(StreamInput in) throws IOExcepti * @param nodesRemovedListener callback for the success or failure */ public synchronized void removeDecommissionedNodes( - Set nodesToBeDecommissioned, - String reason, - TimeValue timeout, - ActionListener nodesRemovedListener + Set nodesToBeDecommissioned, + String reason, + TimeValue timeout, + ActionListener nodesRemovedListener ) { final Map nodesDecommissionTasks = new LinkedHashMap<>( - nodesToBeDecommissioned.size() + nodesToBeDecommissioned.size() ); nodesToBeDecommissioned.forEach(discoveryNode -> { final NodeRemovalClusterStateTaskExecutor.Task task = new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, reason); nodesDecommissionTasks.put(task, nodeRemovalExecutor); }); + + logger.info("submitting state update task to remove [{}] nodes due to decommissioning", nodesToBeDecommissioned.toString()); clusterService.submitStateUpdateTasks( - "node-decommissioned", - nodesDecommissionTasks, - ClusterStateTaskConfig.build(Priority.URGENT), - nodeRemovalExecutor + "node-decommissioned", + nodesDecommissionTasks, + ClusterStateTaskConfig.build(Priority.URGENT), + nodeRemovalExecutor ); Predicate allDecommissionedNodesRemovedPredicate = clusterState -> { Set intersection = Arrays.stream(clusterState.nodes().getNodes().values().toArray(DiscoveryNode.class)) - .collect(Collectors.toSet()); + .collect(Collectors.toSet()); intersection.retainAll(nodesToBeDecommissioned); return intersection.size() == 0; }; @@ -208,20 +200,24 @@ public void onNewClusterState(ClusterState state) { @Override public void onClusterServiceClose() { logger.warn( - "cluster service closed while waiting for removal of decommissioned nodes [{}]", - nodesToBeDecommissioned.toString() + "cluster service closed while waiting for removal of decommissioned nodes [{}]", + nodesToBeDecommissioned.toString() ); } @Override public void onTimeout(TimeValue timeout) { - logger.info("timed out while waiting for removal of decommissioned nodes [{}]", nodesToBeDecommissioned.toString()); + logger.info( + "timed out [{}] while waiting for removal of decommissioned nodes [{}]", + timeout.toString(), + nodesToBeDecommissioned.toString() + ); nodesRemovedListener.onFailure( - new OpenSearchTimeoutException( - "timed out [{}] while waiting for removal of decommissioned nodes [{}] to take effect", - timeout.toString(), - nodesToBeDecommissioned.toString() - ) + new OpenSearchTimeoutException( + "timed out [{}] while waiting for removal of decommissioned nodes [{}]", + timeout.toString(), + nodesToBeDecommissioned.toString() + ) ); } }; @@ -235,31 +231,26 @@ public void onTimeout(TimeValue timeout) { /** * This method updates the status in the currently registered metadata. - * This method also validates the status with its previous state before executing the request * * @param decommissionStatus status to update decommission metadata with * @param listener listener for response and failure */ public void updateMetadataWithDecommissionStatus(DecommissionStatus decommissionStatus, ActionListener listener) { - clusterService.submitStateUpdateTask(decommissionStatus.status(), new ClusterStateUpdateTask(Priority.URGENT) { + clusterService.submitStateUpdateTask("update-decommission-status", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata(); assert decommissionAttributeMetadata != null && decommissionAttributeMetadata.decommissionAttribute() != null; logger.info( - "attempting to update current decommission status [{}] with expected status [{}]", - decommissionAttributeMetadata.status(), - decommissionStatus + "attempting to update current decommission status [{}] with expected status [{}]", + decommissionAttributeMetadata.status(), + decommissionStatus ); - // if the same state is already registered, we will return the current state as is without making any change - if (decommissionAttributeMetadata.status().equals(decommissionStatus)) { - return currentState; - } // setUpdatedStatus can throw IllegalStateException if the sequence of update is not valid decommissionAttributeMetadata.setUpdatedStatus(decommissionStatus); return ClusterState.builder(currentState) - .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) - .build(); + .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) + .build(); } @Override @@ -270,167 +261,10 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { DecommissionAttributeMetadata decommissionAttributeMetadata = newState.metadata().decommissionAttributeMetadata(); + assert decommissionAttributeMetadata != null; + assert decommissionAttributeMetadata.status().equals(decommissionStatus); listener.onResponse(decommissionAttributeMetadata.status()); } }); } - - public void handleNodesDecommissionRequest( - Set nodesToBeDecommissioned, - List zones, - String reason, - TimeValue timeout, - TimeValue timeoutForNodeDecommission, - ActionListener nodesRemovedListener - ) { - setWeightForDecommissionedZone(zones); - checkHttpStatsForDecommissionedNodes(nodesToBeDecommissioned, reason, timeout, timeoutForNodeDecommission, nodesRemovedListener); - } - - private void setWeightForDecommissionedZone(List zones) { - ClusterState clusterState = clusterService.getClusterApplierService().state(); - - DecommissionAttributeMetadata decommissionAttributeMetadata = clusterState.metadata().custom(DecommissionAttributeMetadata.TYPE); - assert decommissionAttributeMetadata.status().equals(DecommissionStatus.INIT) - : "unexpected status encountered while decommissioning nodes"; - DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute(); - - Map weights = new HashMap<>(); - zones.forEach(zone -> { - if (zone.equalsIgnoreCase(decommissionAttribute.attributeValue())) { - weights.put(zone, "0"); - } else { - weights.put(zone, "1"); - } - }); - - // WRR API will validate invalid weights - final ClusterPutWRRWeightsRequest clusterWeightRequest = new ClusterPutWRRWeightsRequest(); - clusterWeightRequest.attributeName("zone"); - clusterWeightRequest.setWRRWeight(weights); - - transportService.sendRequest( - transportService.getLocalNode(), - ClusterPutWRRWeightsAction.NAME, - clusterWeightRequest, - new TransportResponseHandler() { - @Override - public void handleResponse(ClusterPutWRRWeightsResponse response) { - logger.info("Weights are successfully set."); - } - - @Override - public void handleException(TransportException exp) { - // Logging warn message on failure. Should we do Retry? If weights are not set should we fail? - logger.warn("Exception occurred while setting weights.Exception Messages - [{}]", - exp.unwrapCause().getMessage()); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - @Override - public ClusterPutWRRWeightsResponse read(StreamInput in) throws IOException { - return new ClusterPutWRRWeightsResponse(in); - } - }); - } - - public void checkHttpStatsForDecommissionedNodes( - Set decommissionedNodes, - String reason, - TimeValue timeout, - TimeValue timeoutForNodeDecommission, - ActionListener listener) { - - if (timeoutForNodeDecommission.getSeconds() > 0) { - // Wait for timeout to happen. Log the active connection before decommissioning of nodes. - scheduleDecommissionNodesRequestCheck( - decommissionedNodes, - reason, - timeout, - listener, - timeoutForNodeDecommission); - } else { - getActiveRequestCountOnDecommissionNodes(decommissionedNodes); - removeDecommissionedNodes(decommissionedNodes, reason, timeout, listener); - } - } - - private void logActiveConnections(NodesStatsResponse nodesStatsResponse) { - Map nodeActiveConnectionMap = new HashMap<>(); - List responseNodes = nodesStatsResponse.getNodes(); - for (int i=0; i < responseNodes.size(); i++) { - HttpStats httpStats = responseNodes.get(i).getHttp(); - DiscoveryNode node = responseNodes.get(i).getNode(); - nodeActiveConnectionMap.put(node.getId(), httpStats.getServerOpen()); - } - logger.info("Decommissioning node with connections : [{}]", nodeActiveConnectionMap); - } - - private void scheduleDecommissionNodesRequestCheck( - Set decommissionedNodes, - String reason, - TimeValue timeout, - ActionListener nodesRemovedListener, - TimeValue timeoutForNodeDecommission) { - transportService.getThreadPool().schedule(new Runnable() { - @Override - public void run() { - // Check for active connections. - getActiveRequestCountOnDecommissionNodes(decommissionedNodes); - removeDecommissionedNodes(decommissionedNodes, reason, timeout, nodesRemovedListener); - } - - @Override - public String toString() { - return ""; - } - }, timeoutForNodeDecommission, org.opensearch.threadpool.ThreadPool.Names.SAME); - } - - private void getActiveRequestCountOnDecommissionNodes(Set decommissionedNodes) { - if(decommissionedNodes == null || decommissionedNodes.isEmpty()) { - return; - } - String[] nodes = decommissionedNodes.stream() - .map(DiscoveryNode::getId) - .toArray(String[]::new); - - if (nodes.length == 0) { - return; - } - - final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(nodes); - nodesStatsRequest.clear(); - nodesStatsRequest.addMetric(NodesStatsRequest.Metric.HTTP.metricName()); - - transportService.sendRequest( - transportService.getLocalNode(), - NodesStatsAction.NAME, - nodesStatsRequest, - new TransportResponseHandler() { - @Override - public void handleResponse(NodesStatsResponse response) { - logActiveConnections(response); - } - - @Override - public void handleException(TransportException exp) { - logger.warn("Failure occurred while dumping connection for decommission nodes. [{}]", exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - @Override - public NodesStatsResponse read(StreamInput in) throws IOException { - return new NodesStatsResponse(in); - } - }); - } -} \ No newline at end of file +} diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index d8c410529776b..1a0704c5a4ac2 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -49,11 +49,11 @@ * the service makes the best attempt to perform the following task - *
    *
  • Initiates nodes decommissioning by adding custom metadata with the attribute and state as {@link DecommissionStatus#INIT}
  • - *
  • Remove cluster-manager eligible nodes from voting config
  • - *
  • Triggers weigh away for nodes having given awareness attribute to drain. This marks the decommission status as {@link DecommissionStatus#IN_PROGRESS}
  • - *
  • Once weighed away, the service triggers nodes decommission
  • + *
  • Remove to-be-decommissioned cluster-manager eligible nodes from voting config and wait for its abdication if it is active leader
  • + *
  • Triggers weigh away for nodes having given awareness attribute to drain.
  • + *
  • Once weighed away, the service triggers nodes decommission. This marks the decommission status as {@link DecommissionStatus#IN_PROGRESS}
  • *
  • Once the decommission is successful, the service clears the voting config and marks the status as {@link DecommissionStatus#SUCCESSFUL}
  • - *
  • If service fails at any step, it would mark the status as {@link DecommissionStatus#FAILED}
  • + *
  • If service fails at any step, it makes best attempt to mark the status as {@link DecommissionStatus#FAILED} and to clear voting config exclusion
  • *
* * @opensearch.internal @@ -71,12 +71,12 @@ public class DecommissionService { @Inject public DecommissionService( - Settings settings, - ClusterSettings clusterSettings, - ClusterService clusterService, - TransportService transportService, - ThreadPool threadPool, - AllocationService allocationService + Settings settings, + ClusterSettings clusterSettings, + ClusterService clusterService, + TransportService transportService, + ThreadPool threadPool, + AllocationService allocationService ) { this.clusterService = clusterService; this.transportService = transportService; @@ -87,8 +87,8 @@ public DecommissionService( setForcedAwarenessAttributes(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.get(settings)); clusterSettings.addSettingsUpdateConsumer( - CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, - this::setForcedAwarenessAttributes + CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, + this::setForcedAwarenessAttributes ); } @@ -110,18 +110,16 @@ private void setForcedAwarenessAttributes(Settings forceSettings) { /** * Starts the new decommission request and registers the metadata with status as {@link DecommissionStatus#INIT} - * or the last known status if not {@link DecommissionStatus#FAILED} - * Once the status is updated, it tries to exclude to-be-decommissioned cluster manager nodes from Voting Configuration + * Once the status is updated, it tries to exclude to-be-decommissioned cluster manager eligible nodes from Voting Configuration * * @param decommissionAttribute register decommission attribute in the metadata request * @param listener register decommission listener */ public void startDecommissionAction( - final DecommissionAttribute decommissionAttribute, - final ActionListener listener, - final TimeValue timeOutForNodeDecommission + final DecommissionAttribute decommissionAttribute, + final ActionListener listener ) { - // register the metadata with status as DECOMMISSION_INIT as first step + // register the metadata with status as INIT as first step clusterService.submitStateUpdateTask("decommission [" + decommissionAttribute + "]", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { @@ -133,18 +131,18 @@ public ClusterState execute(ClusterState currentState) { decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute); logger.info("registering decommission metadata [{}] to execute action", decommissionAttributeMetadata.toString()); return ClusterState.builder(currentState) - .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) - .build(); + .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) + .build(); } @Override public void onFailure(String source, Exception e) { logger.error( - () -> new ParameterizedMessage( - "failed to start decommission action for attribute [{}]", - decommissionAttribute.toString() - ), - e + () -> new ParameterizedMessage( + "failed to start decommission action for attribute [{}]", + decommissionAttribute.toString() + ), + e ); listener.onFailure(e); } @@ -153,18 +151,41 @@ public void onFailure(String source, Exception e) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { DecommissionAttributeMetadata decommissionAttributeMetadata = newState.metadata().decommissionAttributeMetadata(); assert decommissionAttribute.equals(decommissionAttributeMetadata.decommissionAttribute()); - decommissionClusterManagerNodes(decommissionAttributeMetadata.decommissionAttribute(), listener, timeOutForNodeDecommission); + logger.info( + "registered decommission metadata for attribute [{}] with status [{}]", + decommissionAttributeMetadata.decommissionAttribute(), + decommissionAttributeMetadata.status() + ); + decommissionClusterManagerNodes(decommissionAttributeMetadata.decommissionAttribute(), listener); } }); } private synchronized void decommissionClusterManagerNodes( - final DecommissionAttribute decommissionAttribute, - ActionListener listener, - TimeValue timeOutForNodeDecommission + final DecommissionAttribute decommissionAttribute, + ActionListener listener ) { ClusterState state = clusterService.getClusterApplierService().state(); + // since here metadata is already registered with INIT, we can guarantee that no new node with decommission attribute can further + // join the cluster + // and hence in further request lifecycle we are sure that no new to-be-decommission leader will join the cluster Set clusterManagerNodesToBeDecommissioned = filterNodesWithDecommissionAttribute(state, decommissionAttribute, true); + logger.info( + "resolved cluster manager eligible nodes [{}] that should be removed from Voting Configuration", + clusterManagerNodesToBeDecommissioned.toString() + ); + + // remove all 'to-be-decommissioned' cluster manager eligible nodes from voting config + Set nodeIdsToBeExcluded = clusterManagerNodesToBeDecommissioned.stream() + .map(DiscoveryNode::getId) + .collect(Collectors.toSet()); + + final Predicate allNodesRemovedAndAbdicated = clusterState -> { + final Set votingConfigNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds(); + return nodeIdsToBeExcluded.stream().noneMatch(votingConfigNodeIds::contains) + && nodeIdsToBeExcluded.contains(clusterState.nodes().getClusterManagerNodeId()) == false; + }; + ActionListener exclusionListener = new ActionListener() { @Override public void onResponse(Void unused) { @@ -173,7 +194,7 @@ public void onResponse(Void unused) { // this is an unexpected state, as after exclusion of nodes having decommission attribute, // this local node shouldn't have had the decommission attribute. Will send the failure response to the user String errorMsg = - "unexpected state encountered [local node is to-be-decommissioned leader] while executing decommission request"; + "unexpected state encountered [local node is to-be-decommissioned leader] while executing decommission request"; logger.error(errorMsg); // will go ahead and clear the voting config and mark the status as false clearVotingConfigExclusionAndUpdateStatus(false, false); @@ -182,23 +203,24 @@ public void onResponse(Void unused) { } else { logger.info("will attempt to fail decommissioned nodes as local node is eligible to process the request"); // we are good here to send the response now as the request is processed by an eligible active leader - // and to-be-decommissioned cluster manager is no more part of Voting Configuration + // and to-be-decommissioned cluster manager is no more part of Voting Configuration and no more to-be-decommission + // nodes can be part of Voting Config listener.onResponse(new ClusterStateUpdateResponse(true)); - failDecommissionedNodes(clusterService.getClusterApplierService().state(), timeOutForNodeDecommission); + failDecommissionedNodes(clusterService.getClusterApplierService().state()); } } else { // explicitly calling listener.onFailure with NotClusterManagerException as the local node is not the cluster manager // this will ensures that request is retried until cluster manager times out logger.info( - "local node is not eligible to process the request, " - + "throwing NotClusterManagerException to attempt a retry on an eligible node" + "local node is not eligible to process the request, " + + "throwing NotClusterManagerException to attempt a retry on an eligible node" ); listener.onFailure( - new NotClusterManagerException( - "node [" - + transportService.getLocalNode().toString() - + "] not eligible to execute decommission request. Will retry until timeout." - ) + new NotClusterManagerException( + "node [" + + transportService.getLocalNode().toString() + + "] not eligible to execute decommission request. Will retry until timeout." + ) ); } } @@ -211,32 +233,23 @@ public void onFailure(Exception e) { } }; - // remove all 'to-be-decommissioned' cluster manager eligible nodes from voting config - Set nodeIdsToBeExcluded = clusterManagerNodesToBeDecommissioned.stream() - .map(DiscoveryNode::getId) - .collect(Collectors.toSet()); - - final Predicate allNodesRemovedAndAbdicated = clusterState -> { - final Set votingConfigNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds(); - return nodeIdsToBeExcluded.stream().noneMatch(votingConfigNodeIds::contains) - && nodeIdsToBeExcluded.contains(clusterState.nodes().getClusterManagerNodeId()) == false; - }; - if (allNodesRemovedAndAbdicated.test(clusterService.getClusterApplierService().state())) { + if (allNodesRemovedAndAbdicated.test(state)) { exclusionListener.onResponse(null); } else { + logger.debug("sending transport request to remove nodes [{}] from voting config", nodeIdsToBeExcluded.toString()); // send a transport request to exclude to-be-decommissioned cluster manager eligible nodes from voting config decommissionController.excludeDecommissionedNodesFromVotingConfig(nodeIdsToBeExcluded, new ActionListener() { @Override public void onResponse(Void unused) { logger.info( - "successfully removed decommissioned cluster manager eligible nodes [{}] from voting config ", - clusterManagerNodesToBeDecommissioned.toString() + "successfully removed decommissioned cluster manager eligible nodes [{}] from voting config ", + clusterManagerNodesToBeDecommissioned.toString() ); final ClusterStateObserver abdicationObserver = new ClusterStateObserver( - clusterService, - TimeValue.timeValueSeconds(30L), - logger, - threadPool.getThreadContext() + clusterService, + TimeValue.timeValueSeconds(60L), + logger, + threadPool.getThreadContext() ); final ClusterStateObserver.Listener abdicationListener = new ClusterStateObserver.Listener() { @Override @@ -257,27 +270,32 @@ public void onTimeout(TimeValue timeout) { logger.info("timed out while waiting for abdication of to-be-decommissioned leader"); clearVotingConfigExclusionAndUpdateStatus(false, false); listener.onFailure( - new OpenSearchTimeoutException( - "timed out [{}] while waiting for abdication of to-be-decommissioned leader", - timeout.toString() - ) + new OpenSearchTimeoutException( + "timed out [{}] while waiting for abdication of to-be-decommissioned leader", + timeout.toString() + ) ); } }; // In case the cluster state is already processed even before this code is executed // therefore testing first before attaching the listener - if (allNodesRemovedAndAbdicated.test(clusterService.getClusterApplierService().state())) { - abdicationListener.onNewClusterState(clusterService.getClusterApplierService().state()); + ClusterState currentState = clusterService.getClusterApplierService().state(); + if (allNodesRemovedAndAbdicated.test(currentState)) { + abdicationListener.onNewClusterState(currentState); } else { + logger.debug("waiting to abdicate to-be-decommissioned leader"); abdicationObserver.waitForNextChange(abdicationListener, allNodesRemovedAndAbdicated); } } @Override public void onFailure(Exception e) { - logger.debug( - new ParameterizedMessage("failure in removing decommissioned cluster manager eligible nodes from voting config"), - e + logger.error( + new ParameterizedMessage( + "failure in removing to-be-decommissioned cluster manager eligible nodes [{}] from voting config", + nodeIdsToBeExcluded.toString() + ), + e ); exclusionListener.onFailure(e); } @@ -285,48 +303,42 @@ public void onFailure(Exception e) { } } - private void failDecommissionedNodes(ClusterState state, TimeValue timeOutForNodeDecommission) { + private void failDecommissionedNodes(ClusterState state) { // this method ensures no matter what, we always exit from this function after clearing the voting config exclusion DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata(); DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute(); - - // Awareness values refers to all zones in the cluster - List awarenessValues = forcedAwarenessAttributes.get(decommissionAttribute.attributeName()); - decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.IN_PROGRESS, new ActionListener<>() { @Override public void onResponse(DecommissionStatus status) { - logger.info("updated the decommission status to [{}]", status.toString()); - - decommissionController.handleNodesDecommissionRequest( - filterNodesWithDecommissionAttribute(clusterService.getClusterApplierService().state(), decommissionAttribute, false), - awarenessValues, - "nodes-decommissioned", - TimeValue.timeValueSeconds(30L), // TODO - read timeout from request while integrating with API - timeOutForNodeDecommission, - new ActionListener() { - @Override - public void onResponse(Void unused) { - clearVotingConfigExclusionAndUpdateStatus(true, true); - } - - @Override - public void onFailure(Exception e) { - clearVotingConfigExclusionAndUpdateStatus(false, false); - } + logger.info("updated the decommission status to [{}]", status); + // execute nodes decommissioning + decommissionController.removeDecommissionedNodes( + filterNodesWithDecommissionAttribute(clusterService.getClusterApplierService().state(), decommissionAttribute, false), + "nodes-decommissioned", + TimeValue.timeValueSeconds(120L), + new ActionListener() { + @Override + public void onResponse(Void unused) { + clearVotingConfigExclusionAndUpdateStatus(true, true); + } + + @Override + public void onFailure(Exception e) { + clearVotingConfigExclusionAndUpdateStatus(false, false); } + } ); } @Override public void onFailure(Exception e) { logger.error( - () -> new ParameterizedMessage( - "failed to update decommission status for attribute [{}] to [{}]", - decommissionAttribute.toString(), - DecommissionStatus.IN_PROGRESS - ), - e + () -> new ParameterizedMessage( + "failed to update decommission status for attribute [{}] to [{}]", + decommissionAttribute.toString(), + DecommissionStatus.IN_PROGRESS + ), + e ); // since we are not able to update the status, we will clear the voting config exclusion we have set earlier clearVotingConfigExclusionAndUpdateStatus(false, false); @@ -339,7 +351,7 @@ private void clearVotingConfigExclusionAndUpdateStatus(boolean decommissionSucce @Override public void onResponse(Void unused) { logger.info( - "successfully cleared voting config exclusion after completing decommission action, proceeding to update metadata" + "successfully cleared voting config exclusion after completing decommission action, proceeding to update metadata" ); DecommissionStatus updateStatusWith = decommissionSuccessful ? DecommissionStatus.SUCCESSFUL : DecommissionStatus.FAILED; decommissionController.updateMetadataWithDecommissionStatus(updateStatusWith, statusUpdateListener()); @@ -348,8 +360,8 @@ public void onResponse(Void unused) { @Override public void onFailure(Exception e) { logger.debug( - new ParameterizedMessage("failure in clearing voting config exclusion after processing decommission request"), - e + new ParameterizedMessage("failure in clearing voting config exclusion after processing decommission request"), + e ); decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener()); } @@ -357,14 +369,14 @@ public void onFailure(Exception e) { } private Set filterNodesWithDecommissionAttribute( - ClusterState clusterState, - DecommissionAttribute decommissionAttribute, - boolean onlyClusterManagerNodes + ClusterState clusterState, + DecommissionAttribute decommissionAttribute, + boolean onlyClusterManagerNodes ) { Set nodesWithDecommissionAttribute = new HashSet<>(); Iterator nodesIter = onlyClusterManagerNodes - ? clusterState.nodes().getClusterManagerNodes().valuesIt() - : clusterState.nodes().getNodes().valuesIt(); + ? clusterState.nodes().getClusterManagerNodes().valuesIt() + : clusterState.nodes().getNodes().valuesIt(); while (nodesIter.hasNext()) { final DiscoveryNode node = nodesIter.next(); @@ -380,9 +392,9 @@ private static boolean nodeHasDecommissionedAttribute(DiscoveryNode discoveryNod } private static void validateAwarenessAttribute( - final DecommissionAttribute decommissionAttribute, - List awarenessAttributes, - Map> forcedAwarenessAttributes + final DecommissionAttribute decommissionAttribute, + List awarenessAttributes, + Map> forcedAwarenessAttributes ) { String msg = null; if (awarenessAttributes == null) { @@ -394,9 +406,9 @@ private static void validateAwarenessAttribute( } else if (forcedAwarenessAttributes.containsKey(decommissionAttribute.attributeName()) == false) { msg = "forced awareness attribute [" + forcedAwarenessAttributes.toString() + "] doesn't have the decommissioning attribute"; } else if (forcedAwarenessAttributes.get(decommissionAttribute.attributeName()) - .contains(decommissionAttribute.attributeValue()) == false) { - msg = "invalid awareness attribute value requested for decommissioning. Set forced awareness values before to decommission"; - } + .contains(decommissionAttribute.attributeValue()) == false) { + msg = "invalid awareness attribute value requested for decommissioning. Set forced awareness values before to decommission"; + } if (msg != null) { throw new DecommissioningFailedException(decommissionAttribute, msg); @@ -404,8 +416,8 @@ private static void validateAwarenessAttribute( } private static void ensureEligibleRequest( - DecommissionAttributeMetadata decommissionAttributeMetadata, - DecommissionAttribute requestedDecommissionAttribute + DecommissionAttributeMetadata decommissionAttributeMetadata, + DecommissionAttribute requestedDecommissionAttribute ) { String msg = null; if (decommissionAttributeMetadata != null) { @@ -422,7 +434,7 @@ private static void ensureEligibleRequest( break; default: throw new IllegalStateException( - "unknown status [" + decommissionAttributeMetadata.status() + "] currently registered in metadata" + "unknown status [" + decommissionAttributeMetadata.status() + "] currently registered in metadata" ); } } else { @@ -430,21 +442,21 @@ private static void ensureEligibleRequest( case SUCCESSFUL: // one awareness attribute is already decommissioned. We will reject the new request msg = "one awareness attribute [" - + decommissionAttributeMetadata.decommissionAttribute().toString() - + "] already successfully decommissioned, recommission before triggering another decommission"; + + decommissionAttributeMetadata.decommissionAttribute().toString() + + "] already successfully decommissioned, recommission before triggering another decommission"; break; case IN_PROGRESS: case INIT: // it means the decommission has been initiated or is inflight. In that case, will fail new request msg = "there's an inflight decommission request for attribute [" - + decommissionAttributeMetadata.decommissionAttribute().toString() - + "] is in progress, cannot process this request"; + + decommissionAttributeMetadata.decommissionAttribute().toString() + + "] is in progress, cannot process this request"; break; case FAILED: break; default: throw new IllegalStateException( - "unknown status [" + decommissionAttributeMetadata.status() + "] currently registered in metadata" + "unknown status [" + decommissionAttributeMetadata.status() + "] currently registered in metadata" ); } } @@ -456,16 +468,16 @@ private static void ensureEligibleRequest( } private ActionListener statusUpdateListener() { - return new ActionListener() { + return new ActionListener<>() { @Override public void onResponse(DecommissionStatus status) { - logger.info("updated the decommission status to [{}]", status.toString()); + logger.info("updated the decommission status to [{}]", status); } @Override public void onFailure(Exception e) { - logger.error("unexpected failure during status update", e); + logger.error("unexpected failure occurred during decommission status update", e); } }; } -} \ No newline at end of file +} diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionStatus.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionStatus.java index b4f2e092d1336..af88b0d0f5902 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionStatus.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionStatus.java @@ -65,4 +65,4 @@ public static DecommissionStatus fromString(String status) { } throw new IllegalStateException("Decommission status [" + status + "] not recognized."); } -} \ No newline at end of file +} diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissioningFailedException.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissioningFailedException.java index 589d2b1c247f4..fe1b9368ac712 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissioningFailedException.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissioningFailedException.java @@ -52,4 +52,4 @@ public void writeTo(StreamOutput out) throws IOException { public DecommissionAttribute decommissionAttribute() { return decommissionAttribute; } -} \ No newline at end of file +} diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java index 56cf8cd35b1e5..8b5343184dabd 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java @@ -86,12 +86,12 @@ public void setTransportServiceAndDefaultClusterState() { setState(clusterService, builder); final MockTransport transport = new MockTransport(); transportService = transport.createTransportService( - Settings.EMPTY, - threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundTransportAddress -> clusterService.state().nodes().get("node1"), - null, - emptySet() + Settings.EMPTY, + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundTransportAddress -> clusterService.state().nodes().get("node1"), + null, + emptySet() ); final Settings.Builder nodeSettingsBuilder = Settings.builder(); @@ -99,21 +99,21 @@ public void setTransportServiceAndDefaultClusterState() { clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); new TransportAddVotingConfigExclusionsAction( - nodeSettings, - clusterSettings, - transportService, - clusterService, - threadPool, - new ActionFilters(emptySet()), - new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)) + nodeSettings, + clusterSettings, + transportService, + clusterService, + threadPool, + new ActionFilters(emptySet()), + new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)) ); // registers action new TransportClearVotingConfigExclusionsAction( - transportService, - clusterService, - threadPool, - new ActionFilters(emptySet()), - new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)) + transportService, + clusterService, + threadPool, + new ActionFilters(emptySet()), + new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)) ); // registers action transportService.start(); @@ -178,26 +178,26 @@ public void testNodesRemovedForDecommissionRequestSuccessfulResponse() throws In nodesToBeRemoved.add(clusterService.state().nodes().get("node15")); decommissionController.removeDecommissionedNodes( - nodesToBeRemoved, - "unit-test", - TimeValue.timeValueSeconds(30L), - new ActionListener() { - @Override - public void onResponse(Void unused) { - countDownLatch.countDown(); - } - - @Override - public void onFailure(Exception e) { - fail("there shouldn't have been any failure"); - } + nodesToBeRemoved, + "unit-test", + TimeValue.timeValueSeconds(30L), + new ActionListener() { + @Override + public void onResponse(Void unused) { + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail("there shouldn't have been any failure"); } + } ); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); // test all 5 nodes removed and cluster has 10 nodes Set nodes = StreamSupport.stream(clusterService.getClusterApplierService().state().nodes().spliterator(), false) - .collect(Collectors.toSet()); + .collect(Collectors.toSet()); assertEquals(nodes.size(), 10); // test no nodes part of zone-3 for (DiscoveryNode node : nodes) { @@ -214,22 +214,22 @@ public void testTimesOut() throws InterruptedException { nodesToBeRemoved.add(clusterService.state().nodes().get("node14")); nodesToBeRemoved.add(clusterService.state().nodes().get("node15")); decommissionController.removeDecommissionedNodes( - nodesToBeRemoved, - "unit-test-timeout", - TimeValue.timeValueMillis(2), - new ActionListener() { - @Override - public void onResponse(Void unused) { - fail("response shouldn't have been called"); - } - - @Override - public void onFailure(Exception e) { - assertThat(e, instanceOf(OpenSearchTimeoutException.class)); - assertThat(e.getMessage(), containsString("waiting for removal of decommissioned nodes")); - countDownLatch.countDown(); - } + nodesToBeRemoved, + "unit-test-timeout", + TimeValue.timeValueMillis(2), + new ActionListener() { + @Override + public void onResponse(Void unused) { + fail("response shouldn't have been called"); } + + @Override + public void onFailure(Exception e) { + assertThat(e, instanceOf(OpenSearchTimeoutException.class)); + assertThat(e.getMessage(), containsString("waiting for removal of decommissioned nodes")); + countDownLatch.countDown(); + } + } ); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } @@ -237,8 +237,8 @@ public void onFailure(Exception e) { public void testSuccessfulDecommissionStatusMetadataUpdate() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); DecommissionAttributeMetadata oldMetadata = new DecommissionAttributeMetadata( - new DecommissionAttribute("zone", "zone-1"), - DecommissionStatus.IN_PROGRESS + new DecommissionAttribute("zone", "zone-1"), + DecommissionStatus.IN_PROGRESS ); ClusterState state = clusterService.state(); Metadata metadata = state.metadata(); @@ -248,19 +248,19 @@ public void testSuccessfulDecommissionStatusMetadataUpdate() throws InterruptedE setState(clusterService, state); decommissionController.updateMetadataWithDecommissionStatus( - DecommissionStatus.SUCCESSFUL, - new ActionListener() { - @Override - public void onResponse(DecommissionStatus status) { - assertEquals(DecommissionStatus.SUCCESSFUL, status); - countDownLatch.countDown(); - } - - @Override - public void onFailure(Exception e) { - fail("decommission status update failed"); - } + DecommissionStatus.SUCCESSFUL, + new ActionListener() { + @Override + public void onResponse(DecommissionStatus status) { + assertEquals(DecommissionStatus.SUCCESSFUL, status); + countDownLatch.countDown(); } + + @Override + public void onFailure(Exception e) { + fail("decommission status update failed"); + } + } ); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); ClusterState newState = clusterService.getClusterApplierService().state(); @@ -286,16 +286,16 @@ public ClusterState execute(ClusterState currentState) { currentState.nodes().forEach(n -> votingNodeIds.add(n.getId())); currentState.getVotingConfigExclusions().forEach(t -> votingNodeIds.remove(t.getNodeId())); final CoordinationMetadata.VotingConfiguration votingConfiguration = new CoordinationMetadata.VotingConfiguration( - votingNodeIds + votingNodeIds ); return builder(currentState).metadata( - Metadata.builder(currentState.metadata()) - .coordinationMetadata( - CoordinationMetadata.builder(currentState.coordinationMetadata()) - .lastAcceptedConfiguration(votingConfiguration) - .lastCommittedConfiguration(votingConfiguration) - .build() - ) + Metadata.builder(currentState.metadata()) + .coordinationMetadata( + CoordinationMetadata.builder(currentState.coordinationMetadata()) + .lastAcceptedConfiguration(votingConfiguration) + .lastCommittedConfiguration(votingConfiguration) + .build() + ) ).build(); } @@ -339,18 +339,18 @@ private ClusterState setLocalNodeAsClusterManagerNode(ClusterState clusterState, private ClusterState setThreeNodesInVotingConfig(ClusterState clusterState) { final CoordinationMetadata.VotingConfiguration votingConfiguration = CoordinationMetadata.VotingConfiguration.of( - clusterState.nodes().get("node1"), - clusterState.nodes().get("node6"), - clusterState.nodes().get("node11") + clusterState.nodes().get("node1"), + clusterState.nodes().get("node6"), + clusterState.nodes().get("node11") ); Metadata.Builder builder = Metadata.builder() - .coordinationMetadata( - CoordinationMetadata.builder() - .lastAcceptedConfiguration(votingConfiguration) - .lastCommittedConfiguration(votingConfiguration) - .build() - ); + .coordinationMetadata( + CoordinationMetadata.builder() + .lastAcceptedConfiguration(votingConfiguration) + .lastCommittedConfiguration(votingConfiguration) + .build() + ); clusterState = ClusterState.builder(clusterState).metadata(builder).build(); return clusterState; } @@ -360,6 +360,6 @@ private static DiscoveryNode newNode(String nodeId, Map attribut } final private static Set CLUSTER_MANAGER_DATA_ROLE = Collections.unmodifiableSet( - new HashSet<>(Arrays.asList(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE)) + new HashSet<>(Arrays.asList(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE)) ); -} \ No newline at end of file +}