Skip to content

Commit

Permalink
Add ClusterStateRequest parameter to cluster state transport request
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <widdis@gmail.com>
  • Loading branch information
dbwiddis committed Apr 10, 2023
1 parent e3339e8 commit 73b65ac
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.ActionModule;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterSettingsResponse;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -246,8 +246,8 @@ private void registerRequestHandler() {
ThreadPool.Names.GENERIC,
false,
false,
ExtensionRequest::new,
((request, channel, task) -> channel.sendResponse(handleExtensionRequest(request)))
ClusterStateRequest::new,
((request, channel, task) -> channel.sendResponse(extensionTransportActionsHandler.handleClusterStateRequest(request)))
);
transportService.registerRequestHandler(
REQUEST_EXTENSION_CLUSTER_SETTINGS,
Expand Down Expand Up @@ -436,8 +436,6 @@ public String executor() {
*/
TransportResponse handleExtensionRequest(ExtensionRequest extensionRequest) throws Exception {
switch (extensionRequest.getRequestType()) {
case REQUEST_EXTENSION_CLUSTER_STATE:
return new ClusterStateResponse(clusterService.getClusterName(), clusterService.state(), false);
case REQUEST_EXTENSION_CLUSTER_SETTINGS:
return new ClusterSettingsResponse(clusterService);
case REQUEST_EXTENSION_ENVIRONMENT_SETTINGS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionModule;
import org.opensearch.action.ActionModule.DynamicActionRegistry;
import org.opensearch.action.admin.cluster.state.ClusterStateAction;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.io.stream.StreamInput;
Expand All @@ -31,6 +34,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -310,4 +314,42 @@ public String executor() {
}
return extensionActionResponse;
}

/**
* Handles a {@link ClusterStateRequest}.
*
* @param request The request to handle.
* @return A {@link ClusterStateResponse}.
*/
public ClusterStateResponse handleClusterStateRequest(ClusterStateRequest request) {
final CompletableFuture<ClusterStateResponse> inProgressFuture = new CompletableFuture<>();
client.execute(ClusterStateAction.INSTANCE, request, new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse response) {
inProgressFuture.complete(response);
}

@Override
public void onFailure(Exception exp) {
logger.debug("Cluster State Request failed", exp);
inProgressFuture.completeExceptionally(exp);
}
});
try {
return inProgressFuture.orTimeout(ExtensionsManager.EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS).get();
} catch (ExecutionException e) {
if (e.getCause() instanceof TimeoutException) {
logger.debug("No response from extension to request.");
}
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
} else {
throw new RuntimeException(e.getCause());
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.junit.Before;
import org.opensearch.Version;
import org.opensearch.action.ActionModule;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterSettingsResponse;
import org.opensearch.env.EnvironmentSettingsResponse;
Expand Down Expand Up @@ -524,9 +523,6 @@ public void testHandleExtensionRequest() throws Exception {
ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir);
initialize(extensionsManager);

ExtensionRequest clusterStateRequest = new ExtensionRequest(ExtensionsManager.RequestType.REQUEST_EXTENSION_CLUSTER_STATE);
assertEquals(ClusterStateResponse.class, extensionsManager.handleExtensionRequest(clusterStateRequest).getClass());

ExtensionRequest clusterSettingRequest = new ExtensionRequest(ExtensionsManager.RequestType.REQUEST_EXTENSION_CLUSTER_SETTINGS);
assertEquals(ClusterSettingsResponse.class, extensionsManager.handleExtensionRequest(clusterSettingRequest).getClass());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.opensearch.Version;
import org.opensearch.action.ActionModule;
import org.opensearch.action.ActionModule.DynamicActionRegistry;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -176,4 +178,12 @@ public void testSendTransportRequestToExtension() throws InterruptedException {

expectThrows(NodeNotConnectedException.class, () -> extensionTransportActionsHandler.sendTransportRequestToExtension(request));
}

public void testHandleClusterStateRequest() throws Exception {
ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all();
assertEquals(
ClusterStateResponse.class,
extensionTransportActionsHandler.handleClusterStateRequest(clusterStateRequest).getClass()
);
}
}

0 comments on commit 73b65ac

Please sign in to comment.