From 7d4224519ef472f44b798e076fac75a29413ccdc Mon Sep 17 00:00:00 2001 From: kkewwei Date: Fri, 13 Sep 2024 16:02:00 +0800 Subject: [PATCH] Cancellation support for cat/nodes and optimize it Signed-off-by: kkewwei --- CHANGELOG.md | 2 +- .../java/org/opensearch/http/HttpCatIT.java | 68 +++++ .../org/opensearch/action/ActionModule.java | 3 + .../cluster/node/Nodes/CatNodesAction.java | 25 ++ .../cluster/node/Nodes/CatNodesRequest.java | 62 +++++ .../cluster/node/Nodes/CatNodesResponse.java | 63 +++++ .../node/Nodes/TransportCatNodesAction.java | 252 ++++++++++++++++++ .../cluster/node/Nodes/package-info.java | 10 + .../cluster/node/info/NodesInfoRequest.java | 13 + .../cluster/node/stats/NodesStatsRequest.java | 13 + .../support/nodes/BaseNodesRequest.java | 11 + .../rest/action/cat/RestNodesAction.java | 74 ++--- .../node/Nodes/CatNodesResponseTests.java | 57 ++++ 13 files changed, 605 insertions(+), 48 deletions(-) create mode 100644 qa/smoke-test-http/src/test/java/org/opensearch/http/HttpCatIT.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesRequest.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesResponse.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/TransportCatNodesAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/package-info.java create mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesResponseTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index ec4ac7504b9c2..d75fd65d0f0c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `com.nimbusds:oauth2-oidc-sdk` from 11.9.1 to 11.19.1 ([#15862](https://github.com/opensearch-project/OpenSearch/pull/15862)) ### Changed - +- Cancellation support for cat/nodes and optimize it ([#14853](https://github.com/opensearch-project/OpenSearch/pull/14853)) ### Deprecated diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/HttpCatIT.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/HttpCatIT.java new file mode 100644 index 0000000000000..d4af0ad684a3c --- /dev/null +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/HttpCatIT.java @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http; + +import org.apache.hc.core5.http.ParseException; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.client.RestClient; +import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import org.opensearch.test.OpenSearchIntegTestCase.Scope; + +import java.io.IOException; + +import static org.apache.hc.core5.http.HttpStatus.SC_OK; +import static org.hamcrest.Matchers.containsString; + +@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 5, numClientNodes = 0) +public class HttpCatIT extends HttpSmokeTestCase { + + public void testdoCatRequest() throws IOException { + try (RestClient restClient = getRestClient()) { + int nodesCount = restClient.getNodes().size(); + assertEquals(5, nodesCount); + + // to make sure the timeout is working + for (int i = 0; i < 5; i++) { + sendRequest(restClient, 30, nodesCount); + } + + // no timeout + for (int i = 0; i < 5; i++) { + sendRequest(restClient, -1, nodesCount); + } + + for (int i = 1; i < 5; i++) { + long timeout = randomInt(300); + sendRequest(restClient, timeout, nodesCount); + } + } + } + + private void sendRequest(RestClient restClient, long timeout, int nodesCount) { + Request nodesRequest; + if (timeout < 0) { + nodesRequest = new Request("GET", "/_cat/nodes"); + } else { + nodesRequest = new Request("GET", "/_cat/nodes?timeout=" + timeout + "ms"); + } + try { + Response response = restClient.performRequest(nodesRequest); + assertEquals(SC_OK, response.getStatusLine().getStatusCode()); + String result = EntityUtils.toString(response.getEntity()); + String[] NodeInfos = result.split("\n"); + assertEquals(nodesCount, NodeInfos.length); + } catch (IOException | ParseException e) { + // it means that it costs too long to get ClusterState from the master. + assertThat(e.getMessage(), containsString("There is not enough time to obtain nodesInfo metric from the cluster manager")); + } + } + +} diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index fbf90b97d1e8f..5b0acaca4546c 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -48,6 +48,8 @@ import org.opensearch.action.admin.cluster.decommission.awareness.put.TransportDecommissionAction; import org.opensearch.action.admin.cluster.health.ClusterHealthAction; import org.opensearch.action.admin.cluster.health.TransportClusterHealthAction; +import org.opensearch.action.admin.cluster.node.Nodes.CatNodesAction; +import org.opensearch.action.admin.cluster.node.Nodes.TransportCatNodesAction; import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction; import org.opensearch.action.admin.cluster.node.hotthreads.TransportNodesHotThreadsAction; import org.opensearch.action.admin.cluster.node.info.NodesInfoAction; @@ -649,6 +651,7 @@ public void reg actions.register(ClusterDeleteWeightedRoutingAction.INSTANCE, TransportDeleteWeightedRoutingAction.class); actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class); actions.register(CatShardsAction.INSTANCE, TransportCatShardsAction.class); + actions.register(CatNodesAction.INSTANCE, TransportCatNodesAction.class); actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class); actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class); actions.register(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesAction.java new file mode 100644 index 0000000000000..dfc4fcb6365b6 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesAction.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.node.Nodes; + +import org.opensearch.action.ActionType; + +/** + * Transport action for cat nodes + * + * @opensearch.internal + */ +public class CatNodesAction extends ActionType { + public static final CatNodesAction INSTANCE = new CatNodesAction(); + public static final String NAME = "cluster:monitor/nodes/cat"; + + public CatNodesAction() { + super(NAME, CatNodesResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesRequest.java new file mode 100644 index 0000000000000..e032430156b0c --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesRequest.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.node.Nodes; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.tasks.TaskId; +import org.opensearch.rest.action.admin.cluster.ClusterAdminTask; + +import java.io.IOException; +import java.util.Map; + +/** + * A request of _cat/nodes. + * + * @opensearch.api + */ +public class CatNodesRequest extends ClusterManagerNodeReadRequest { + + private TimeValue cancelAfterTimeInterval; + private long timeout = -1; + + public CatNodesRequest() {} + + public CatNodesRequest(StreamInput in) throws IOException { + super(in); + } + + public void setCancelAfterTimeInterval(TimeValue timeout) { + this.cancelAfterTimeInterval = timeout; + } + + public TimeValue getCancelAfterTimeInterval() { + return cancelAfterTimeInterval; + } + + public void setTimeout(long timeout) { + this.timeout = timeout; + } + + public long getTimeout() { + return timeout; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public ClusterAdminTask createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ClusterAdminTask(id, type, action, parentTaskId, headers, this.cancelAfterTimeInterval); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesResponse.java new file mode 100644 index 0000000000000..408e503796d44 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesResponse.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.node.Nodes; + +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * A response of a cat shards request. + * + * @opensearch.api + */ +public class CatNodesResponse extends ActionResponse { + + private ClusterStateResponse clusterStateResponse; + private NodesInfoResponse nodesInfoResponse; + private NodesStatsResponse nodesStatsResponse; + + public CatNodesResponse( + ClusterStateResponse clusterStateResponse, + NodesInfoResponse nodesInfoResponse, + NodesStatsResponse nodesStatsResponse + ) { + this.clusterStateResponse = clusterStateResponse; + this.nodesInfoResponse = nodesInfoResponse; + this.nodesStatsResponse = nodesStatsResponse; + } + + public CatNodesResponse(StreamInput in) throws IOException { + super(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + clusterStateResponse.writeTo(out); + nodesInfoResponse.writeTo(out); + nodesStatsResponse.writeTo(out); + } + + public NodesStatsResponse getNodesStatsResponse() { + return nodesStatsResponse; + } + + public NodesInfoResponse getNodesInfoResponse() { + return nodesInfoResponse; + } + + public ClusterStateResponse getClusterStateResponse() { + return clusterStateResponse; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/TransportCatNodesAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/TransportCatNodesAction.java new file mode 100644 index 0000000000000..7701c579300c0 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/TransportCatNodesAction.java @@ -0,0 +1,252 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.node.Nodes; + +import org.opensearch.OpenSearchTimeoutException; +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.admin.cluster.node.info.NodeInfo; +import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.TimeoutTaskCancellationUtility; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.action.NotifyOnceListener; +import org.opensearch.core.tasks.TaskId; +import org.opensearch.tasks.CancellableTask; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +/** + * Perform cat nodes action + * + * @opensearch.internal + */ +public class TransportCatNodesAction extends HandledTransportAction { + public static final long TIMEOUT_THRESHOLD_NANO = 5_000_000; + private final NodeClient client; + + @Inject + public TransportCatNodesAction(NodeClient client, TransportService transportService, ActionFilters actionFilters) { + super(CatNodesAction.NAME, transportService, actionFilters, CatNodesRequest::new); + this.client = client; + } + + @Override + protected void doExecute(Task parentTask, CatNodesRequest catNodesRequest, ActionListener listener) { + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear().nodes(true); + clusterStateRequest.local(catNodesRequest.local()); + clusterStateRequest.clusterManagerNodeTimeout(catNodesRequest.clusterManagerNodeTimeout()); + assert parentTask instanceof CancellableTask; + TaskId taskId = new TaskId(client.getLocalNodeId(), parentTask.getId()); + clusterStateRequest.setParentTask(taskId); + ActionListener originalListener = new NotifyOnceListener<>() { + @Override + protected void innerOnResponse(CatNodesResponse catShardsResponse) { + listener.onResponse(catShardsResponse); + } + + @Override + protected void innerOnFailure(Exception e) { + listener.onFailure(e); + } + }; + + ActionListener cancellableListener = TimeoutTaskCancellationUtility.wrapWithCancellationListener( + client, + (CancellableTask) parentTask, + ((CancellableTask) parentTask).getCancellationTimeout(), + originalListener, + e -> { + originalListener.onFailure(e); + } + ); + final long beginTimeNano = System.nanoTime(); + try { + client.admin().cluster().state(clusterStateRequest, new ActionListener<>() { + @Override + public void onResponse(ClusterStateResponse clusterStateResponse) { + long leftTimeNano = -1; + if (catNodesRequest.getTimeout() > 0) { + leftTimeNano = catNodesRequest.getTimeout() - System.nanoTime() + beginTimeNano; + if (leftTimeNano < TIMEOUT_THRESHOLD_NANO) { + onFailure( + new OpenSearchTimeoutException( + "There is not enough time to obtain nodesInfo metric from the cluster manager:" + + clusterStateResponse.getState().nodes().getClusterManagerNode().getName() + ) + ); + return; + } + } + String[] nodeIds = clusterStateResponse.getState().nodes().resolveNodes(); + ConcurrentMap successNodeInfos = new ConcurrentHashMap<>(nodeIds.length); + ConcurrentMap failNodeInfos = new ConcurrentHashMap<>(nodeIds.length); + ConcurrentMap successNodeStats = new ConcurrentHashMap<>(nodeIds.length); + ConcurrentMap failNodeStats = new ConcurrentHashMap<>(nodeIds.length); + AtomicInteger counter = new AtomicInteger(); + for (String nodeId : nodeIds) { + NodesInfoRequest nodesInfoRequest = createNodesInfoRequest(leftTimeNano, nodeId, taskId); + client.admin().cluster().nodesInfo(nodesInfoRequest, new ActionListener<>() { + @Override + public void onResponse(NodesInfoResponse nodesInfoResponse) { + assert nodesInfoResponse.getNodes().size() + nodesInfoResponse.failures().size() == 1; + NodesStatsRequest nodesStatsRequest = checkAndCreateNodesStatsRequest( + nodesInfoResponse.failures(), + catNodesRequest.getTimeout(), + beginTimeNano, + nodeId, + this::onFailure, + clusterStateResponse.getState().nodes().get(nodeId).getName(), + taskId + ); + if (nodesStatsRequest == null) { + return; + } + successNodeInfos.put(nodeId, nodesInfoResponse.getNodes().get(0)); + client.admin().cluster().nodesStats(nodesStatsRequest, ActionListener.runAfter(new ActionListener<>() { + @Override + public void onResponse(NodesStatsResponse nodesStatsResponse) { + assert nodesStatsResponse.getNodes().size() + nodesStatsResponse.failures().size() == 1; + if (nodesStatsResponse.getNodes().size() == 1) { + successNodeStats.put(nodeId, nodesStatsResponse.getNodes().get(0)); + } else { + failNodeStats.put(nodeId, nodesStatsResponse.failures().get(0)); + } + } + + @Override + public void onFailure(Exception e) { + assert e instanceof FailedNodeException; + failNodeStats.put(nodeId, (FailedNodeException) e); + } + }, this::onOperation)); + } + + @Override + public void onFailure(Exception e) { + assert e instanceof FailedNodeException; + failNodeInfos.put(nodeId, (FailedNodeException) e); + onOperation(); + } + + private void onOperation() { + if (counter.incrementAndGet() == nodeIds.length) { + try { + NodesInfoResponse nodesInfoResponse = new NodesInfoResponse( + clusterStateResponse.getClusterName(), + new ArrayList<>(successNodeInfos.values()), + new ArrayList<>(failNodeInfos.values()) + ); + NodesStatsResponse nodesStatsResponse = new NodesStatsResponse( + clusterStateResponse.getClusterName(), + new ArrayList<>(successNodeStats.values()), + new ArrayList<>(failNodeStats.values()) + ); + + CatNodesResponse catNodesResponse = new CatNodesResponse( + clusterStateResponse, + nodesInfoResponse, + nodesStatsResponse + ); + cancellableListener.onResponse(catNodesResponse); + } catch (Exception e) { + e.addSuppressed(e); + logger.error("failed to send failure response", e); + } + } + } + }); + } + } + + @Override + public void onFailure(Exception e) { + cancellableListener.onFailure(e); + } + }); + } catch (Exception e) { + cancellableListener.onFailure(e); + } + } + + private NodesInfoRequest createNodesInfoRequest(long leftTimeNano, String nodeId, TaskId parentTask) { + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + if (leftTimeNano > 0) { + nodesInfoRequest.timeout(TimeValue.timeValueNanos(leftTimeNano)); + } + nodesInfoRequest.clear() + .nodesIds(nodeId) + .addMetrics( + NodesInfoRequest.Metric.JVM.metricName(), + NodesInfoRequest.Metric.OS.metricName(), + NodesInfoRequest.Metric.PROCESS.metricName(), + NodesInfoRequest.Metric.HTTP.metricName() + ); + nodesInfoRequest.setShouldCancelOnTimeout(true); + nodesInfoRequest.setParentTask(parentTask); + return nodesInfoRequest; + } + + private NodesStatsRequest checkAndCreateNodesStatsRequest( + List failedNodeExceptions, + long timeoutNano, + long beginTimeNano, + String nodeId, + Consumer failedConsumer, + String nodeName, + TaskId parentTask + ) { + if (failedNodeExceptions.isEmpty() == false) { + failedConsumer.accept(failedNodeExceptions.get(0)); + return null; + } + NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); + if (timeoutNano > 0) { + long leftTime = timeoutNano - System.nanoTime() + beginTimeNano; + if (leftTime < TIMEOUT_THRESHOLD_NANO) { + failedConsumer.accept( + new FailedNodeException(nodeId, "There is not enough time to obtain nodesStats metric from " + nodeName, null) + ); + return null; + } + nodesStatsRequest.timeout(TimeValue.timeValueMillis(leftTime)); + } + nodesStatsRequest.clear() + .nodesIds(nodeId) + .indices(true) + .addMetrics( + NodesStatsRequest.Metric.JVM.metricName(), + NodesStatsRequest.Metric.OS.metricName(), + NodesStatsRequest.Metric.FS.metricName(), + NodesStatsRequest.Metric.PROCESS.metricName(), + NodesStatsRequest.Metric.SCRIPT.metricName() + ); + nodesStatsRequest.indices().setIncludeIndicesStatsByLevel(true); + nodesStatsRequest.setShouldCancelOnTimeout(true); + nodesStatsRequest.setParentTask(parentTask); + return nodesStatsRequest; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/package-info.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/package-info.java new file mode 100644 index 0000000000000..c48492cb0dcdc --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/Nodes/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Cat Nodes transport handlers. */ +package org.opensearch.action.admin.cluster.node.Nodes; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodesInfoRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodesInfoRequest.java index 26b53e8db642f..895286bf91368 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodesInfoRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/info/NodesInfoRequest.java @@ -36,10 +36,14 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.tasks.TaskId; +import org.opensearch.rest.action.admin.cluster.ClusterAdminTask; +import org.opensearch.tasks.Task; import java.io.IOException; import java.util.Arrays; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -165,6 +169,15 @@ public void writeTo(StreamOutput out) throws IOException { out.writeStringArray(requestedMetrics.toArray(new String[0])); } + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + if (this.getShouldCancelOnTimeout()) { + return new ClusterAdminTask(id, type, action, parentTaskId, headers); + } else { + return super.createTask(id, type, action, parentTaskId, headers); + } + } + /** * An enumeration of the "core" sections of metrics that may be requested * from the nodes information endpoint. Eventually this list will be diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java index f1f9f93afdad2..ac0a4388617e3 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -37,10 +37,14 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.tasks.TaskId; +import org.opensearch.rest.action.admin.cluster.ClusterAdminTask; +import org.opensearch.tasks.Task; import java.io.IOException; import java.util.Arrays; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -190,6 +194,15 @@ public void writeTo(StreamOutput out) throws IOException { out.writeStringArray(requestedMetrics.toArray(new String[0])); } + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + if (this.getShouldCancelOnTimeout()) { + return new ClusterAdminTask(id, type, action, parentTaskId, headers); + } else { + return super.createTask(id, type, action, parentTaskId, headers); + } + } + /** * An enumeration of the "core" sections of metrics that may be requested * from the nodes stats endpoint. Eventually this list will be pluggable. diff --git a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java index 06f2eed5d228d..313b28acabcad 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java @@ -78,6 +78,8 @@ public abstract class BaseNodesRequest private TimeValue timeout; + protected boolean shouldCancelOnTimeout = false; + protected BaseNodesRequest(StreamInput in) throws IOException { super(in); nodesIds = in.readStringArray(); @@ -155,4 +157,13 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalArray(concreteNodes); out.writeOptionalTimeValue(timeout); } + + public void setShouldCancelOnTimeout(boolean shouldCancelOnTimeout) { + this.shouldCancelOnTimeout = shouldCancelOnTimeout; + } + + public boolean getShouldCancelOnTimeout() { + return this.shouldCancelOnTimeout; + } + } diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java index 26d6b91882cbd..664d6174bb213 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java @@ -32,13 +32,13 @@ package org.opensearch.rest.action.cat; +import org.opensearch.action.admin.cluster.node.Nodes.CatNodesAction; +import org.opensearch.action.admin.cluster.node.Nodes.CatNodesRequest; +import org.opensearch.action.admin.cluster.node.Nodes.CatNodesResponse; import org.opensearch.action.admin.cluster.node.info.NodeInfo; -import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; import org.opensearch.action.admin.cluster.node.stats.NodeStats; -import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; -import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.node.DiscoveryNode; @@ -47,6 +47,7 @@ import org.opensearch.common.Table; import org.opensearch.common.logging.DeprecationLogger; import org.opensearch.common.network.NetworkAddress; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.Strings; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.common.unit.ByteSizeValue; @@ -70,7 +71,6 @@ import org.opensearch.monitor.process.ProcessStats; import org.opensearch.rest.RestRequest; import org.opensearch.rest.RestResponse; -import org.opensearch.rest.action.RestActionListener; import org.opensearch.rest.action.RestResponseListener; import org.opensearch.script.ScriptStats; import org.opensearch.search.suggest.completion.CompletionStats; @@ -81,6 +81,7 @@ import static java.util.Collections.singletonList; import static org.opensearch.rest.RestRequest.Method.GET; +import static org.opensearch.search.SearchService.NO_TIMEOUT; /** * _cat API action to get node information @@ -109,55 +110,34 @@ protected void documentation(StringBuilder sb) { @Override public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) { - final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); - clusterStateRequest.clear().nodes(true); if (request.hasParam("local")) { deprecationLogger.deprecate("cat_nodes_local_parameter", LOCAL_DEPRECATED_MESSAGE); } - clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local())); - clusterStateRequest.clusterManagerNodeTimeout( - request.paramAsTime("cluster_manager_timeout", clusterStateRequest.clusterManagerNodeTimeout()) + final CatNodesRequest catNodesRequest = new CatNodesRequest(); + catNodesRequest.local(request.paramAsBoolean("local", catNodesRequest.local())); + catNodesRequest.clusterManagerNodeTimeout( + request.paramAsTime("cluster_manager_timeout", catNodesRequest.clusterManagerNodeTimeout()) ); - parseDeprecatedMasterTimeoutParameter(clusterStateRequest, request, deprecationLogger, getName()); + catNodesRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", NO_TIMEOUT)); + if (request.hasParam("timeout")) { + catNodesRequest.setTimeout(TimeValue.parseTimeValue(request.param("timeout"), "timeout").nanos()); + } + + parseDeprecatedMasterTimeoutParameter(catNodesRequest, request, deprecationLogger, getName()); final boolean fullId = request.paramAsBoolean("full_id", false); - return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener(channel) { + return channel -> client.execute(CatNodesAction.INSTANCE, catNodesRequest, new RestResponseListener(channel) { @Override - public void processResponse(final ClusterStateResponse clusterStateResponse) { - NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); - nodesInfoRequest.timeout(request.param("timeout")); - nodesInfoRequest.clear() - .addMetrics( - NodesInfoRequest.Metric.JVM.metricName(), - NodesInfoRequest.Metric.OS.metricName(), - NodesInfoRequest.Metric.PROCESS.metricName(), - NodesInfoRequest.Metric.HTTP.metricName() - ); - client.admin().cluster().nodesInfo(nodesInfoRequest, new RestActionListener(channel) { - @Override - public void processResponse(final NodesInfoResponse nodesInfoResponse) { - NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); - nodesStatsRequest.timeout(request.param("timeout")); - nodesStatsRequest.clear() - .indices(true) - .addMetrics( - NodesStatsRequest.Metric.JVM.metricName(), - NodesStatsRequest.Metric.OS.metricName(), - NodesStatsRequest.Metric.FS.metricName(), - NodesStatsRequest.Metric.PROCESS.metricName(), - NodesStatsRequest.Metric.SCRIPT.metricName() - ); - nodesStatsRequest.indices().setIncludeIndicesStatsByLevel(true); - client.admin().cluster().nodesStats(nodesStatsRequest, new RestResponseListener(channel) { - @Override - public RestResponse buildResponse(NodesStatsResponse nodesStatsResponse) throws Exception { - return RestTable.buildResponse( - buildTable(fullId, request, clusterStateResponse, nodesInfoResponse, nodesStatsResponse), - channel - ); - } - }); - } - }); + public RestResponse buildResponse(CatNodesResponse response) throws Exception { + return RestTable.buildResponse( + buildTable( + fullId, + request, + response.getClusterStateResponse(), + response.getNodesInfoResponse(), + response.getNodesStatsResponse() + ), + channel + ); } }); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesResponseTests.java new file mode 100644 index 0000000000000..e17cfbf77a2f9 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/Nodes/CatNodesResponseTests.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.node.Nodes; + +import org.opensearch.Version; +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodeRoleGenerator; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static java.util.Collections.emptyMap; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CatNodesResponseTests extends OpenSearchTestCase { + + public void testGetAndSetCatNodesResponse() { + ClusterName clusterName = new ClusterName("cluster-1"); + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + + Set roles = new HashSet<>(); + String roleName = "test_role"; + DiscoveryNodeRole testRole = DiscoveryNodeRoleGenerator.createDynamicRole(roleName); + roles.add(testRole); + + builder.add(new DiscoveryNode("node-1", buildNewFakeTransportAddress(), emptyMap(), roles, Version.CURRENT)); + DiscoveryNodes discoveryNodes = builder.build(); + ClusterState clusterState = mock(ClusterState.class); + when(clusterState.nodes()).thenReturn(discoveryNodes); + + ClusterStateResponse clusterStateResponse = new ClusterStateResponse(clusterName, clusterState, false); + NodesInfoResponse nodesInfoResponse = new NodesInfoResponse(clusterName, Collections.emptyList(), Collections.emptyList()); + NodesStatsResponse nodesStatsResponse = new NodesStatsResponse(clusterName, Collections.emptyList(), Collections.emptyList()); + CatNodesResponse catShardsResponse = new CatNodesResponse(clusterStateResponse, nodesInfoResponse, nodesStatsResponse); + + assertEquals(nodesStatsResponse, catShardsResponse.getNodesStatsResponse()); + assertEquals(nodesInfoResponse, catShardsResponse.getNodesInfoResponse()); + assertEquals(clusterStateResponse, catShardsResponse.getClusterStateResponse()); + + } +}