Skip to content

Commit

Permalink
Allow weight updates for non decommissioned attribute
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Oct 27, 2022
1 parent 722fbfd commit 976f7da
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.ack.ClusterStateUpdateResponse;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.cluster.decommission.DecommissionAttributeMetadata;
import org.opensearch.cluster.decommission.DecommissionStatus;
import org.opensearch.cluster.metadata.Metadata;
Expand All @@ -34,6 +35,7 @@

import java.util.List;
import java.util.Locale;
import java.util.Objects;

import static org.opensearch.action.ValidateActions.addValidationError;

Expand Down Expand Up @@ -70,8 +72,8 @@ public void registerWeightedRoutingMetadata(
clusterService.submitStateUpdateTask("update_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
// verify currently no decommission action is ongoing
ensureNoOngoingDecommissionAction(currentState);
// verify weights will not be updated for a decommissioned attribute
ensureNoWeightUpdateForDecommissionedAttribute(currentState, request);
Metadata metadata = currentState.metadata();
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE);
Expand Down Expand Up @@ -159,9 +161,12 @@ public void verifyAwarenessAttribute(String attributeName) {
}
}

public void ensureNoOngoingDecommissionAction(ClusterState state) {
public void ensureNoWeightUpdateForDecommissionedAttribute(ClusterState state, ClusterPutWeightedRoutingRequest request) {
DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata();
if (decommissionAttributeMetadata != null && decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED) == false) {
if (decommissionAttributeMetadata != null
&& decommissionAttributeMetadata.status().equals(DecommissionStatus.FAILED) == false
&& Objects.equals(request.getWeightedRouting().attributeName(), decommissionAttributeMetadata.decommissionAttribute().attributeName())
&& Objects.equals(request.getWeightedRouting().weights().get(decommissionAttributeMetadata.decommissionAttribute().attributeValue()), 0.0) == false) {
throw new IllegalStateException(
"a decommission action is ongoing with status ["
+ decommissionAttributeMetadata.status().status()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,4 +362,36 @@ public void onFailure(Exception e) {}
weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

public void testAddWeightedRoutingPassesWhenWeightOfDecommissionedAttributeStillZero() throws InterruptedException {
Map<String, Double> weights = Map.of("zone_A", 0.0, "zone_B", 1.0, "zone_C", 1.0);
DecommissionStatus status = DecommissionStatus.SUCCESSFUL;
ClusterState state = clusterService.state();
state = setWeightedRoutingWeights(state, weights);
state = setDecommissionAttribute(state, status);
ClusterState.Builder builder = ClusterState.builder(state);
ClusterServiceUtils.setState(clusterService, builder);

ClusterPutWeightedRoutingRequestBuilder request = new ClusterPutWeightedRoutingRequestBuilder(
client,
ClusterAddWeightedRoutingAction.INSTANCE
);
Map<String, Double> updatedWeights = Map.of("zone_A", 0.0, "zone_B", 2.0, "zone_C", 1.0);
WeightedRouting updatedWeightedRouting = new WeightedRouting("zone", updatedWeights);
request.setWeightedRouting(updatedWeightedRouting);
final CountDownLatch countDownLatch = new CountDownLatch(1);
ActionListener<ClusterStateUpdateResponse> listener = new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) {
assertTrue(clusterStateUpdateResponse.isAcknowledged());
countDownLatch.countDown();
}

@Override
public void onFailure(Exception e) {}
};
weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
assertEquals(updatedWeightedRouting, clusterService.state().metadata().weightedRoutingMetadata().getWeightedRouting());
}
}

0 comments on commit 976f7da

Please sign in to comment.