Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster local health call to throw exception if node is decommissioned or weighed away #6198

Merged
merged 2 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Refactor] Use local opensearch.common.SetOnce instead of lucene's utility class ([#5947](https://github.com/opensearch-project/OpenSearch/pull/5947))
- Cluster health call to throw decommissioned exception for local decommissioned node([#6008](https://github.com/opensearch-project/OpenSearch/pull/6008))
- [Refactor] core.common to new opensearch-common library ([#5976](https://github.com/opensearch-project/OpenSearch/pull/5976))
- Cluster local health call to throw exception if node is decommissioned or weighed away ([#6198](https://github.com/opensearch-project/OpenSearch/pull/6198))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ public boolean innerMatch(LogEvent event) {
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureNodeCommissioned(true)
.setEnsureNodeWeighedIn(true)
.execute()
.actionGet();
assertFalse(activeNodeLocalHealth.isTimedOut());
Expand All @@ -329,7 +329,7 @@ public boolean innerMatch(LogEvent event) {
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureNodeCommissioned(true)
.setEnsureNodeWeighedIn(true)
.execute()
.actionGet()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.common.settings.Settings;
import org.opensearch.rest.RestStatus;
import org.opensearch.snapshots.mockstore.MockRepository;
Expand Down Expand Up @@ -546,4 +547,135 @@ public void testPutAndDeleteWithVersioning() throws Exception {
);
assertEquals(RestStatus.CONFLICT, deleteException.status());
}

public void testClusterHealthResponseWithEnsureNodeWeighedInParam() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

logger.info("--> starting 3 nodes on different zones");
int nodeCountPerAZ = 1;

logger.info("--> starting a dedicated cluster manager node");
String clusterManager = internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 2 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");

Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// Check cluster health for weighed in node, health check should return a response with 200 status code
ClusterHealthResponse nodeLocalHealth = client(nodes_in_zone_a.get(0)).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureNodeWeighedIn(true)
.get();
assertFalse(nodeLocalHealth.isTimedOut());

// Check cluster health for weighed away node, health check should respond with an exception
NodeWeighedAwayException ex = expectThrows(
NodeWeighedAwayException.class,
() -> client(nodes_in_zone_c.get(0)).admin().cluster().prepareHealth().setLocal(true).setEnsureNodeWeighedIn(true).get()
);
assertTrue(ex.getMessage().contains("local node is weighed away"));

logger.info("--> running cluster health on an index that does not exists");
ClusterHealthResponse healthResponse = client(nodes_in_zone_c.get(0)).admin()
.cluster()
.prepareHealth("test1")
.setLocal(true)
.setEnsureNodeWeighedIn(true)
.setTimeout("1s")
.execute()
.actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(true));
assertThat(healthResponse.getStatus(), equalTo(ClusterHealthStatus.RED));
assertThat(healthResponse.getIndices().isEmpty(), equalTo(true));

Set<String> nodesInOneSide = Stream.of(nodes_in_zone_a.get(0), nodes_in_zone_b.get(0), nodes_in_zone_c.get(0))
.collect(Collectors.toCollection(HashSet::new));
Set<String> nodesInOtherSide = Stream.of(clusterManager).collect(Collectors.toCollection(HashSet::new));

NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide),
NetworkDisruption.DISCONNECT
);
internalCluster().setDisruptionScheme(networkDisruption);

logger.info("--> network disruption is started");
networkDisruption.startDisrupting();

// wait for leader checker to fail
Thread.sleep(13000);

// Check cluster health for weighed in node when cluster manager is not discovered, health check should
// return a response with 200 status code
nodeLocalHealth = client(nodes_in_zone_a.get(0)).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureNodeWeighedIn(true)
.get();
assertFalse(nodeLocalHealth.isTimedOut());
assertFalse(nodeLocalHealth.hasDiscoveredClusterManager());

// Check cluster health for weighed away node when cluster manager is not discovered, health check should
// return a response with 200 status code with cluster manager discovered as false
// ensure_node_weighed_in is not executed if cluster manager is not discovered
nodeLocalHealth = client(nodes_in_zone_c.get(0)).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureNodeWeighedIn(true)
.get();
assertFalse(nodeLocalHealth.isTimedOut());
assertFalse(nodeLocalHealth.hasDiscoveredClusterManager());

networkDisruption.stopDisrupting();
Thread.sleep(1000);

// delete weights
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get();
assertTrue(deleteResponse.isAcknowledged());

// Check local cluster health
nodeLocalHealth = client(nodes_in_zone_c.get(0)).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureNodeWeighedIn(true)
.get();
assertFalse(nodeLocalHealth.isTimedOut());
assertTrue(nodeLocalHealth.hasDiscoveredClusterManager());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.action.support.replication.ReplicationOperation;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.routing.NodeWeighedAwayException;
import org.opensearch.cluster.routing.PreferenceBasedSearchNotAllowedException;
import org.opensearch.cluster.routing.UnsupportedWeightedRoutingStateException;
import org.opensearch.cluster.service.ClusterManagerThrottlingException;
Expand Down Expand Up @@ -1640,6 +1641,7 @@ private enum OpenSearchExceptionHandle {
168,
V_2_6_0
),
NODE_WEIGHED_AWAY_EXCEPTION(NodeWeighedAwayException.class, NodeWeighedAwayException::new, 169, V_2_6_0),
INDEX_CREATE_BLOCK_EXCEPTION(
org.opensearch.cluster.block.IndexCreateBlockException.class,
org.opensearch.cluster.block.IndexCreateBlockException::new,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class ClusterHealthRequest extends ClusterManagerNodeReadRequest<ClusterH
private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
private String waitForNodes = "";
private Priority waitForEvents = null;
private boolean ensureNodeCommissioned = false;
private boolean ensureNodeWeighedIn = false;
/**
* Only used by the high-level REST Client. Controls the details level of the health information returned.
* The default value is 'cluster'.
Expand Down Expand Up @@ -100,7 +100,7 @@ public ClusterHealthRequest(StreamInput in) throws IOException {
level = in.readEnum(Level.class);
}
if (in.getVersion().onOrAfter(Version.V_2_6_0)) {
ensureNodeCommissioned = in.readBoolean();
ensureNodeWeighedIn = in.readBoolean();
}
}

Expand Down Expand Up @@ -135,7 +135,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeEnum(level);
}
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
out.writeBoolean(ensureNodeCommissioned);
out.writeBoolean(ensureNodeWeighedIn);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this cause BWC failures? If yes we can retain both flags and deprecate the former

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since 2.6.0 is not released yet, do you this BWC is required? I am looking out for test failures in gradle build, will that be sufficient?

}
}

Expand Down Expand Up @@ -321,8 +321,8 @@ public String getAwarenessAttribute() {
return awarenessAttribute;
}

public final ClusterHealthRequest ensureNodeCommissioned(boolean ensureNodeCommissioned) {
this.ensureNodeCommissioned = ensureNodeCommissioned;
public final ClusterHealthRequest ensureNodeWeighedIn(boolean ensureNodeWeighedIn) {
this.ensureNodeWeighedIn = ensureNodeWeighedIn;
return this;
}

Expand All @@ -331,8 +331,8 @@ public final ClusterHealthRequest ensureNodeCommissioned(boolean ensureNodeCommi
* @return <code>true</code> if local information is to be returned only when local node is also commissioned
* <code>false</code> to not check local node if commissioned or not for a local request
*/
public final boolean ensureNodeCommissioned() {
return ensureNodeCommissioned;
public final boolean ensureNodeWeighedIn() {
return ensureNodeWeighedIn;
}

@Override
Expand All @@ -342,8 +342,8 @@ public ActionRequestValidationException validate() {
} else if (!level.equals(Level.AWARENESS_ATTRIBUTES) && awarenessAttribute != null) {
return addValidationError("level=awareness_attributes is required with awareness_attribute parameter", null);
}
if (ensureNodeCommissioned && local == false) {
return addValidationError("not a local request to ensure local node commissioned", null);
if (ensureNodeWeighedIn && local == false) {
return addValidationError("not a local request to ensure local node commissioned or weighed in", null);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ public ClusterHealthRequestBuilder setLevel(String level) {
/**
* Specifies if the local request should ensure that the local node is commissioned
*/
public final ClusterHealthRequestBuilder setEnsureNodeCommissioned(boolean ensureNodeCommissioned) {
request.ensureNodeCommissioned(ensureNodeCommissioned);
public final ClusterHealthRequestBuilder setEnsureNodeWeighedIn(boolean ensureNodeCommissioned) {
request.ensureNodeWeighedIn(ensureNodeCommissioned);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.NodeWeighedAwayException;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.WeightedRoutingUtils;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Strings;
Expand Down Expand Up @@ -140,12 +143,13 @@ protected void clusterManagerOperation(
final ClusterState unusedState,
final ActionListener<ClusterHealthResponse> listener
) {
if (request.ensureNodeCommissioned()
if (request.ensureNodeWeighedIn()
&& discovery instanceof Coordinator
&& ((Coordinator) discovery).localNodeCommissioned() == false) {
listener.onFailure(new NodeDecommissionedException("local node is decommissioned"));
return;
}

final int waitCount = getWaitCount(request);

if (request.waitForEvents() != null) {
Expand Down Expand Up @@ -274,7 +278,18 @@ private void executeHealth(

final Predicate<ClusterState> validationPredicate = newState -> validateRequest(request, newState, waitCount);
if (validationPredicate.test(currentState)) {
listener.onResponse(getResponse(request, currentState, waitCount, TimeoutState.OK));
ClusterHealthResponse clusterHealthResponse = getResponse(request, currentState, waitCount, TimeoutState.OK);
if (request.ensureNodeWeighedIn() && clusterHealthResponse.hasDiscoveredClusterManager()) {
DiscoveryNode localNode = clusterService.state().getNodes().getLocalNode();
Comment on lines -277 to +283
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert that local node is true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we picking up the new state of the cluster and not the one where the observer actually succeeded. IMO, this should be currentState.getNodes().getLocalNode(). Because, what if something has changed in the state and entire health action was ran on a previous state. This might give misleading results

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't understand this completely. Do you mean clusterService.state() may provide new uncommitted state? If that is case, I understand healthcheck wait for cluster state change events to complete and cluster service also has committed cluster state. Let me know if I am missing anything?

assert request.local() == true : "local node request false for request for local node weighed in";
boolean weighedAway = WeightedRoutingUtils.isWeighedAway(localNode.getId(), clusterService.state());
if (weighedAway) {
listener.onFailure(new NodeWeighedAwayException("local node is weighed away"));
return;
}
}

listener.onResponse(clusterHealthResponse);
} else {
final ClusterStateObserver observer = new ClusterStateObserver(
currentState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@
import org.opensearch.OpenSearchException;
import org.opensearch.action.search.SearchShardIterator;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.index.shard.ShardId;
import org.opensearch.rest.RestStatus;
import org.opensearch.search.SearchShardTarget;

import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

/**
* This class contains logic to find next shard to retry search request in case of failure from other shard copy.
Expand Down Expand Up @@ -58,37 +54,6 @@ private boolean isInternalFailure(Exception exception) {
return false;
}

/**
* This function checks if the shard is present in data node with weighted routing weight set to 0,
* In such cases we fail open, if shard search request for the shard from other shard copies fail with non
* retryable exception.
*
* @param nodeId the node with the shard copy
* @return true if the node has attribute value with shard routing weight set to zero, else false
*/
private boolean isWeighedAway(String nodeId, ClusterState clusterState) {
DiscoveryNode node = clusterState.nodes().get(nodeId);
WeightedRoutingMetadata weightedRoutingMetadata = clusterState.metadata().weightedRoutingMetadata();
if (weightedRoutingMetadata != null) {
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
if (weightedRouting != null && weightedRouting.isSet()) {
// Fetch weighted routing attributes with weight set as zero
Stream<String> keys = weightedRouting.weights()
.entrySet()
.stream()
.filter(entry -> entry.getValue().intValue() == WeightedRoutingMetadata.WEIGHED_AWAY_WEIGHT)
.map(Map.Entry::getKey);

for (Object key : keys.toArray()) {
if (node.getAttributes().get(weightedRouting.attributeName()).equals(key.toString())) {
return true;
}
}
}
}
return false;
}

/**
* This function returns next shard copy to retry search request in case of failure from previous copy returned
* by the iterator. It has the logic to fail open ie request shard copies present in nodes with weighted shard
Expand All @@ -99,7 +64,7 @@ private boolean isWeighedAway(String nodeId, ClusterState clusterState) {
*/
public SearchShardTarget findNext(final SearchShardIterator shardIt, ClusterState clusterState, Exception exception) {
SearchShardTarget next = shardIt.nextOrNull();
while (next != null && isWeighedAway(next.getNodeId(), clusterState)) {
while (next != null && WeightedRoutingUtils.isWeighedAway(next.getNodeId(), clusterState)) {
SearchShardTarget nextShard = next;
if (canFailOpen(nextShard.getShardId(), exception, clusterState)) {
logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.getShardId()), exception);
Expand All @@ -122,7 +87,7 @@ public SearchShardTarget findNext(final SearchShardIterator shardIt, ClusterStat
public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState clusterState, Exception exception) {
ShardRouting next = shardsIt.nextOrNull();

while (next != null && isWeighedAway(next.currentNodeId(), clusterState)) {
while (next != null && WeightedRoutingUtils.isWeighedAway(next.currentNodeId(), clusterState)) {
ShardRouting nextShard = next;
if (canFailOpen(nextShard.shardId(), exception, clusterState)) {
logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.shardId()), exception);
Expand Down
Loading