Skip to content

Commit

Permalink
Cancellation support for cat/nodes and optimize it
Browse files Browse the repository at this point in the history
Signed-off-by: kkewwei <kkewwei@163.com>
  • Loading branch information
kkewwei committed Sep 14, 2024
1 parent aaa92ae commit 7d42245
Show file tree
Hide file tree
Showing 13 changed files with 605 additions and 48 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `com.nimbusds:oauth2-oidc-sdk` from 11.9.1 to 11.19.1 ([#15862](https://github.com/opensearch-project/OpenSearch/pull/15862))

### Changed

- Cancellation support for cat/nodes and optimize it ([#14853](https://github.com/opensearch-project/OpenSearch/pull/14853))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http;

import org.apache.hc.core5.http.ParseException;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.RestClient;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;

import java.io.IOException;

import static org.apache.hc.core5.http.HttpStatus.SC_OK;
import static org.hamcrest.Matchers.containsString;

@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 5, numClientNodes = 0)
public class HttpCatIT extends HttpSmokeTestCase {

public void testdoCatRequest() throws IOException {
try (RestClient restClient = getRestClient()) {
int nodesCount = restClient.getNodes().size();
assertEquals(5, nodesCount);

// to make sure the timeout is working
for (int i = 0; i < 5; i++) {
sendRequest(restClient, 30, nodesCount);
}

// no timeout
for (int i = 0; i < 5; i++) {
sendRequest(restClient, -1, nodesCount);
}

for (int i = 1; i < 5; i++) {
long timeout = randomInt(300);
sendRequest(restClient, timeout, nodesCount);
}
}
}

private void sendRequest(RestClient restClient, long timeout, int nodesCount) {
Request nodesRequest;
if (timeout < 0) {
nodesRequest = new Request("GET", "/_cat/nodes");
} else {
nodesRequest = new Request("GET", "/_cat/nodes?timeout=" + timeout + "ms");
}
try {
Response response = restClient.performRequest(nodesRequest);
assertEquals(SC_OK, response.getStatusLine().getStatusCode());
String result = EntityUtils.toString(response.getEntity());
String[] NodeInfos = result.split("\n");
assertEquals(nodesCount, NodeInfos.length);
} catch (IOException | ParseException e) {
// it means that it costs too long to get ClusterState from the master.
assertThat(e.getMessage(), containsString("There is not enough time to obtain nodesInfo metric from the cluster manager"));
}
}

}
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.opensearch.action.admin.cluster.decommission.awareness.put.TransportDecommissionAction;
import org.opensearch.action.admin.cluster.health.ClusterHealthAction;
import org.opensearch.action.admin.cluster.health.TransportClusterHealthAction;
import org.opensearch.action.admin.cluster.node.Nodes.CatNodesAction;
import org.opensearch.action.admin.cluster.node.Nodes.TransportCatNodesAction;
import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction;
import org.opensearch.action.admin.cluster.node.hotthreads.TransportNodesHotThreadsAction;
import org.opensearch.action.admin.cluster.node.info.NodesInfoAction;
Expand Down Expand Up @@ -649,6 +651,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(ClusterDeleteWeightedRoutingAction.INSTANCE, TransportDeleteWeightedRoutingAction.class);
actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
actions.register(CatShardsAction.INSTANCE, TransportCatShardsAction.class);
actions.register(CatNodesAction.INSTANCE, TransportCatNodesAction.class);
actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
actions.register(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.node.Nodes;

import org.opensearch.action.ActionType;

/**
* Transport action for cat nodes
*
* @opensearch.internal
*/
public class CatNodesAction extends ActionType<CatNodesResponse> {
public static final CatNodesAction INSTANCE = new CatNodesAction();
public static final String NAME = "cluster:monitor/nodes/cat";

public CatNodesAction() {
super(NAME, CatNodesResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.node.Nodes;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.rest.action.admin.cluster.ClusterAdminTask;

import java.io.IOException;
import java.util.Map;

/**
* A request of _cat/nodes.
*
* @opensearch.api
*/
public class CatNodesRequest extends ClusterManagerNodeReadRequest<CatNodesRequest> {

private TimeValue cancelAfterTimeInterval;
private long timeout = -1;

public CatNodesRequest() {}

public CatNodesRequest(StreamInput in) throws IOException {
super(in);
}

Check warning on line 35 in server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesRequest.java#L34-L35

Added lines #L34 - L35 were not covered by tests

public void setCancelAfterTimeInterval(TimeValue timeout) {
this.cancelAfterTimeInterval = timeout;
}

public TimeValue getCancelAfterTimeInterval() {
return cancelAfterTimeInterval;

Check warning on line 42 in server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesRequest.java#L42

Added line #L42 was not covered by tests
}

public void setTimeout(long timeout) {
this.timeout = timeout;
}

Check warning on line 47 in server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesRequest.java#L46-L47

Added lines #L46 - L47 were not covered by tests

public long getTimeout() {
return timeout;

Check warning on line 50 in server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesRequest.java#L50

Added line #L50 was not covered by tests
}

@Override
public ActionRequestValidationException validate() {
return null;

Check warning on line 55 in server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesRequest.java#L55

Added line #L55 was not covered by tests
}

@Override
public ClusterAdminTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ClusterAdminTask(id, type, action, parentTaskId, headers, this.cancelAfterTimeInterval);

Check warning on line 60 in server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesRequest.java#L60

Added line #L60 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.node.Nodes;

import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* A response of a cat shards request.
*
* @opensearch.api
*/
public class CatNodesResponse extends ActionResponse {

private ClusterStateResponse clusterStateResponse;
private NodesInfoResponse nodesInfoResponse;
private NodesStatsResponse nodesStatsResponse;

public CatNodesResponse(
ClusterStateResponse clusterStateResponse,
NodesInfoResponse nodesInfoResponse,
NodesStatsResponse nodesStatsResponse
) {
this.clusterStateResponse = clusterStateResponse;
this.nodesInfoResponse = nodesInfoResponse;
this.nodesStatsResponse = nodesStatsResponse;
}

public CatNodesResponse(StreamInput in) throws IOException {
super(in);
}

Check warning on line 43 in server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesResponse.java#L42-L43

Added lines #L42 - L43 were not covered by tests

@Override
public void writeTo(StreamOutput out) throws IOException {
clusterStateResponse.writeTo(out);
nodesInfoResponse.writeTo(out);
nodesStatsResponse.writeTo(out);
}

Check warning on line 50 in server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesResponse.java#L47-L50

Added lines #L47 - L50 were not covered by tests

public NodesStatsResponse getNodesStatsResponse() {
return nodesStatsResponse;
}

public NodesInfoResponse getNodesInfoResponse() {
return nodesInfoResponse;
}

public ClusterStateResponse getClusterStateResponse() {
return clusterStateResponse;
}
}
Loading

0 comments on commit 7d42245

Please sign in to comment.