Skip to content

Commit

Permalink
Use controller for status update and account for terminal status
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Nov 5, 2022
1 parent 11e3c81 commit fe59288
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.clearExclusionsAndGetState;

/**
* Helper controller class to remove list of nodes from the cluster and update status
*
Expand Down Expand Up @@ -79,42 +81,6 @@ public class DecommissionController {
this.threadPool = threadPool;
}

/**
* Transport call to clear voting config exclusion
*
* @param listener callback for response or failure
*/
public void clearVotingConfigExclusion(ActionListener<Void> listener, boolean waitForRemoval) {
final ClearVotingConfigExclusionsRequest clearVotingConfigExclusionsRequest = new ClearVotingConfigExclusionsRequest();
clearVotingConfigExclusionsRequest.setWaitForRemoval(waitForRemoval);
transportService.sendRequest(
transportService.getLocalNode(),
ClearVotingConfigExclusionsAction.NAME,
clearVotingConfigExclusionsRequest,
new TransportResponseHandler<ClearVotingConfigExclusionsResponse>() {
@Override
public void handleResponse(ClearVotingConfigExclusionsResponse response) {
listener.onResponse(null);
}

@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public ClearVotingConfigExclusionsResponse read(StreamInput in) throws IOException {
return new ClearVotingConfigExclusionsResponse(in);
}
}
);
}

/**
* This method triggers batch of tasks for nodes to be decommissioned using executor {@link NodeRemovalClusterStateTaskExecutor}
* Once the tasks are submitted, it waits for an expected cluster state to guarantee
Expand Down Expand Up @@ -201,7 +167,7 @@ public void onTimeout(TimeValue timeout) {
* @param decommissionStatus status to update decommission metadata with
* @param listener listener for response and failure
*/
public void updateMetadataWithDecommissionStatus(DecommissionStatus decommissionStatus, ActionListener<DecommissionStatus> listener) {
public void updateMetadataWithDecommissionStatus(DecommissionStatus decommissionStatus, ActionListener<DecommissionStatus> listener, boolean isTerminalStatus) {
clusterService.submitStateUpdateTask("update-decommission-status", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
Expand All @@ -218,9 +184,14 @@ public ClusterState execute(ClusterState currentState) {
decommissionAttributeMetadata.decommissionAttribute(),
decommissionStatus
);
return ClusterState.builder(currentState)
ClusterState newState = ClusterState.builder(currentState)
.metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata))
.build();

if (isTerminalStatus) {
newState = clearExclusionsAndGetState(newState);
}
return newState;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse;
import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateResponse;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest;
Expand Down Expand Up @@ -43,6 +44,7 @@
import java.util.stream.Collectors;

import static org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction.MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING;
import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.clearExclusionsAndGetState;
import static org.opensearch.cluster.decommission.DecommissionHelper.addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes;
import static org.opensearch.cluster.decommission.DecommissionHelper.filterNodesWithDecommissionAttribute;
import static org.opensearch.cluster.decommission.DecommissionHelper.nodeHasDecommissionedAttribute;
Expand Down Expand Up @@ -231,7 +233,7 @@ public void onNewClusterState(ClusterState state) {
"unexpected state encountered [local node is to-be-decommissioned leader] while executing decommission request";
logger.error(errorMsg);
// will go ahead and clear the voting config and mark the status as failed
clearVotingConfigExclusionAndUpdateStatus(false, false);
decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true);
listener.onFailure(new IllegalStateException(errorMsg));
} else {
logger.info("will attempt to fail decommissioned nodes as local node is eligible to process the request");
Expand Down Expand Up @@ -269,7 +271,8 @@ public void onTimeout(TimeValue timeout) {
String errorMsg = "timed out [" + timeout.toString() + "while removing to-be-decommissioned cluster manager eligible nodes [" + nodeIdsToBeExcluded.toString() + "] from voting config";
logger.error(errorMsg);
listener.onFailure(new OpenSearchTimeoutException(errorMsg));
clearVotingConfigExclusionAndUpdateStatus(false, false);
// will go ahead and clear the voting config and mark the status as failed
decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true);
}
};

Expand Down Expand Up @@ -317,10 +320,11 @@ public void onFailure(Exception e) {
),
e
);
// since we are not able to update the status, we will clear the voting config exclusion we have set earlier
clearVotingConfigExclusionAndUpdateStatus(false, false);
// TODO - this might not be needed
// will go ahead and clear the voting config and mark the status as failed
decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true);
}
});
}, false);
}
}

Expand Down Expand Up @@ -360,12 +364,14 @@ public void onResponse(DecommissionStatus status) {
new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
clearVotingConfigExclusionAndUpdateStatus(true, true);
// will clear the voting config exclusion and mark the status as successful
decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.SUCCESSFUL, statusUpdateListener(), true);
}

@Override
public void onFailure(Exception e) {
clearVotingConfigExclusionAndUpdateStatus(false, false);
// will go ahead and clear the voting config and mark the status as failed
decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true);
}
}
);
Expand All @@ -381,32 +387,11 @@ public void onFailure(Exception e) {
),
e
);
// since we are not able to update the status, we will clear the voting config exclusion we have set earlier
clearVotingConfigExclusionAndUpdateStatus(false, false);
// TODO - this might not be needed
// will go ahead and clear the voting config and mark the status as failed
decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true);
}
});
}

private void clearVotingConfigExclusionAndUpdateStatus(boolean decommissionSuccessful, boolean waitForRemoval) {
decommissionController.clearVotingConfigExclusion(new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
logger.info(
"successfully cleared voting config exclusion after completing decommission action, proceeding to update metadata"
);
DecommissionStatus updateStatusWith = decommissionSuccessful ? DecommissionStatus.SUCCESSFUL : DecommissionStatus.FAILED;
decommissionController.updateMetadataWithDecommissionStatus(updateStatusWith, statusUpdateListener());
}

@Override
public void onFailure(Exception e) {
logger.debug(
new ParameterizedMessage("failure in clearing voting config exclusion after processing decommission request"),
e
);
decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener());
}
}, waitForRemoval);
}, false);
}

private static void validateAwarenessAttribute(
Expand Down Expand Up @@ -533,21 +518,27 @@ public void startRecommissionAction(final ActionListener<DeleteDecommissionState
* Once the excluded nodes have stopped, clear the voting configuration exclusions with DELETE /_cluster/voting_config_exclusions.
* And hence it is safe to remove the exclusion if any. User should make conscious choice before decommissioning awareness attribute.
*/
decommissionController.clearVotingConfigExclusion(new ActionListener<Void>() {
clusterService.submitStateUpdateTask("clear-voting-config-exclusions-during-recommission", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public void onResponse(Void unused) {
logger.info("successfully cleared voting config exclusion for deleting the decommission.");
deleteDecommissionState(listener);
public ClusterState execute(ClusterState currentState) {
return clearExclusionsAndGetState(currentState);
}

@Override
public void onFailure(Exception e) {
public void onFailure(String source, Exception e) {
logger.error("Failure in clearing voting config during delete_decommission request.", e);
listener.onFailure(e);
}
}, false);

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
logger.info("successfully cleared voting config exclusion for deleting the decommission.");
deleteDecommissionState(listener);
}
});
}

// TODO - merge this state update with above call
void deleteDecommissionState(ActionListener<DeleteDecommissionStateResponse> listener) {
clusterService.submitStateUpdateTask("delete_decommission_state", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,23 +131,7 @@ public void shutdownThreadPoolAndClusterService() {
}

// TODO - Add test for custom exclusion

public void testClearVotingConfigExclusions() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
decommissionController.clearVotingConfigExclusion(new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
countDownLatch.countDown();
}

@Override
public void onFailure(Exception e) {
fail("unexpected failure occurred while clearing voting config exclusion" + e);
}
}, false);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
assertThat(clusterService.getClusterApplierService().state().getVotingConfigExclusions(), empty());
}
// TODO - Add test for clear exclusion

public void testNodesRemovedForDecommissionRequestSuccessfulResponse() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -278,7 +262,7 @@ public void onFailure(Exception e) {
fail("decommission status update failed");
countDownLatch.countDown();
}
});
}, false);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
ClusterState newState = clusterService.getClusterApplierService().state();
DecommissionAttributeMetadata decommissionAttributeMetadata = newState.metadata().decommissionAttributeMetadata();
Expand Down

0 comments on commit fe59288

Please sign in to comment.