diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index 788d9fb702254..0f1641463ede2 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -52,6 +52,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.clearExclusionsAndGetState; + /** * Helper controller class to remove list of nodes from the cluster and update status * @@ -79,42 +81,6 @@ public class DecommissionController { this.threadPool = threadPool; } - /** - * Transport call to clear voting config exclusion - * - * @param listener callback for response or failure - */ - public void clearVotingConfigExclusion(ActionListener listener, boolean waitForRemoval) { - final ClearVotingConfigExclusionsRequest clearVotingConfigExclusionsRequest = new ClearVotingConfigExclusionsRequest(); - clearVotingConfigExclusionsRequest.setWaitForRemoval(waitForRemoval); - transportService.sendRequest( - transportService.getLocalNode(), - ClearVotingConfigExclusionsAction.NAME, - clearVotingConfigExclusionsRequest, - new TransportResponseHandler() { - @Override - public void handleResponse(ClearVotingConfigExclusionsResponse response) { - listener.onResponse(null); - } - - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - @Override - public ClearVotingConfigExclusionsResponse read(StreamInput in) throws IOException { - return new ClearVotingConfigExclusionsResponse(in); - } - } - ); - } - /** * This method triggers batch of tasks for nodes to be decommissioned using executor {@link NodeRemovalClusterStateTaskExecutor} * Once the tasks are submitted, it waits for an expected cluster state to guarantee @@ -201,7 +167,7 @@ public void onTimeout(TimeValue timeout) { * @param decommissionStatus status to update decommission metadata with * @param listener listener for response and failure */ - public void updateMetadataWithDecommissionStatus(DecommissionStatus decommissionStatus, ActionListener listener) { + public void updateMetadataWithDecommissionStatus(DecommissionStatus decommissionStatus, ActionListener listener, boolean isTerminalStatus) { clusterService.submitStateUpdateTask("update-decommission-status", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { @@ -218,9 +184,14 @@ public ClusterState execute(ClusterState currentState) { decommissionAttributeMetadata.decommissionAttribute(), decommissionStatus ); - return ClusterState.builder(currentState) + ClusterState newState = ClusterState.builder(currentState) .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) .build(); + + if (isTerminalStatus) { + newState = clearExclusionsAndGetState(newState); + } + return newState; } @Override diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 11a0ef5d91a24..5f82f1f3d98cc 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchTimeoutException; import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse; import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateResponse; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; @@ -43,6 +44,7 @@ import java.util.stream.Collectors; import static org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction.MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.clearExclusionsAndGetState; import static org.opensearch.cluster.decommission.DecommissionHelper.addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes; import static org.opensearch.cluster.decommission.DecommissionHelper.filterNodesWithDecommissionAttribute; import static org.opensearch.cluster.decommission.DecommissionHelper.nodeHasDecommissionedAttribute; @@ -231,7 +233,7 @@ public void onNewClusterState(ClusterState state) { "unexpected state encountered [local node is to-be-decommissioned leader] while executing decommission request"; logger.error(errorMsg); // will go ahead and clear the voting config and mark the status as failed - clearVotingConfigExclusionAndUpdateStatus(false, false); + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true); listener.onFailure(new IllegalStateException(errorMsg)); } else { logger.info("will attempt to fail decommissioned nodes as local node is eligible to process the request"); @@ -269,7 +271,8 @@ public void onTimeout(TimeValue timeout) { String errorMsg = "timed out [" + timeout.toString() + "while removing to-be-decommissioned cluster manager eligible nodes [" + nodeIdsToBeExcluded.toString() + "] from voting config"; logger.error(errorMsg); listener.onFailure(new OpenSearchTimeoutException(errorMsg)); - clearVotingConfigExclusionAndUpdateStatus(false, false); + // will go ahead and clear the voting config and mark the status as failed + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true); } }; @@ -317,10 +320,11 @@ public void onFailure(Exception e) { ), e ); - // since we are not able to update the status, we will clear the voting config exclusion we have set earlier - clearVotingConfigExclusionAndUpdateStatus(false, false); + // TODO - this might not be needed + // will go ahead and clear the voting config and mark the status as failed + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true); } - }); + }, false); } } @@ -360,12 +364,14 @@ public void onResponse(DecommissionStatus status) { new ActionListener() { @Override public void onResponse(Void unused) { - clearVotingConfigExclusionAndUpdateStatus(true, true); + // will clear the voting config exclusion and mark the status as successful + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.SUCCESSFUL, statusUpdateListener(), true); } @Override public void onFailure(Exception e) { - clearVotingConfigExclusionAndUpdateStatus(false, false); + // will go ahead and clear the voting config and mark the status as failed + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true); } } ); @@ -381,32 +387,11 @@ public void onFailure(Exception e) { ), e ); - // since we are not able to update the status, we will clear the voting config exclusion we have set earlier - clearVotingConfigExclusionAndUpdateStatus(false, false); + // TODO - this might not be needed + // will go ahead and clear the voting config and mark the status as failed + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true); } - }); - } - - private void clearVotingConfigExclusionAndUpdateStatus(boolean decommissionSuccessful, boolean waitForRemoval) { - decommissionController.clearVotingConfigExclusion(new ActionListener() { - @Override - public void onResponse(Void unused) { - logger.info( - "successfully cleared voting config exclusion after completing decommission action, proceeding to update metadata" - ); - DecommissionStatus updateStatusWith = decommissionSuccessful ? DecommissionStatus.SUCCESSFUL : DecommissionStatus.FAILED; - decommissionController.updateMetadataWithDecommissionStatus(updateStatusWith, statusUpdateListener()); - } - - @Override - public void onFailure(Exception e) { - logger.debug( - new ParameterizedMessage("failure in clearing voting config exclusion after processing decommission request"), - e - ); - decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener()); - } - }, waitForRemoval); + }, false); } private static void validateAwarenessAttribute( @@ -533,21 +518,27 @@ public void startRecommissionAction(final ActionListener() { + clusterService.submitStateUpdateTask("clear-voting-config-exclusions-during-recommission", new ClusterStateUpdateTask(Priority.URGENT) { @Override - public void onResponse(Void unused) { - logger.info("successfully cleared voting config exclusion for deleting the decommission."); - deleteDecommissionState(listener); + public ClusterState execute(ClusterState currentState) { + return clearExclusionsAndGetState(currentState); } @Override - public void onFailure(Exception e) { + public void onFailure(String source, Exception e) { logger.error("Failure in clearing voting config during delete_decommission request.", e); listener.onFailure(e); } - }, false); + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.info("successfully cleared voting config exclusion for deleting the decommission."); + deleteDecommissionState(listener); + } + }); } + // TODO - merge this state update with above call void deleteDecommissionState(ActionListener listener) { clusterService.submitStateUpdateTask("delete_decommission_state", new ClusterStateUpdateTask(Priority.URGENT) { @Override diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java index 10a4c477b73f1..ace7a136a3dd4 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java @@ -131,23 +131,7 @@ public void shutdownThreadPoolAndClusterService() { } // TODO - Add test for custom exclusion - - public void testClearVotingConfigExclusions() throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(1); - decommissionController.clearVotingConfigExclusion(new ActionListener() { - @Override - public void onResponse(Void unused) { - countDownLatch.countDown(); - } - - @Override - public void onFailure(Exception e) { - fail("unexpected failure occurred while clearing voting config exclusion" + e); - } - }, false); - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat(clusterService.getClusterApplierService().state().getVotingConfigExclusions(), empty()); - } + // TODO - Add test for clear exclusion public void testNodesRemovedForDecommissionRequestSuccessfulResponse() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); @@ -278,7 +262,7 @@ public void onFailure(Exception e) { fail("decommission status update failed"); countDownLatch.countDown(); } - }); + }, false); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); ClusterState newState = clusterService.getClusterApplierService().state(); DecommissionAttributeMetadata decommissionAttributeMetadata = newState.metadata().decommissionAttributeMetadata();