From cad531ecd40e33009524dc602003567d4cf08fb4 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 24 May 2018 21:23:44 +0200 Subject: [PATCH] Use remote client in TransportFieldCapsAction (#30838) We now have a remote cluster client exposed which can talk to a given remote cluster and manages reconnects etc. This makes code more readable than using the transport layer directly. --- .../TransportFieldCapabilitiesAction.java | 60 +++++-------------- 1 file changed, 14 insertions(+), 46 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 2313d9f5fc690..2adea56730ee4 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; @@ -33,10 +34,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; -import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.util.ArrayList; @@ -49,7 +46,6 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction remoteIndices : remoteClusterIndices.entrySet()) { String clusterAlias = remoteIndices.getKey(); OriginalIndices originalIndices = remoteIndices.getValue(); - // if we are connected this is basically a no-op, if we are not we try to connect in parallel in a non-blocking fashion - remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(v -> { - Transport.Connection connection = remoteClusterService.getConnection(clusterAlias); - FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest(); - remoteRequest.setMergeResults(false); // we need to merge on this node - remoteRequest.indicesOptions(originalIndices.indicesOptions()); - remoteRequest.indices(originalIndices.indices()); - remoteRequest.fields(request.fields()); - transportService.sendRequest(connection, FieldCapabilitiesAction.NAME, remoteRequest, TransportRequestOptions.EMPTY, - new TransportResponseHandler() { - - @Override - public FieldCapabilitiesResponse newInstance() { - return new FieldCapabilitiesResponse(); - } - - @Override - public void handleResponse(FieldCapabilitiesResponse response) { - try { - for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) { - indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware. - buildRemoteIndexName(clusterAlias, res.getIndexName()), res.get())); - } - } finally { - onResponse.run(); - } - } - - @Override - public void handleException(TransportException exp) { - onResponse.run(); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - }, e -> onResponse.run())); + Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias); + FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest(); + remoteRequest.setMergeResults(false); // we need to merge on this node + remoteRequest.indicesOptions(originalIndices.indicesOptions()); + remoteRequest.indices(originalIndices.indices()); + remoteRequest.fields(request.fields()); + remoteClusterClient.fieldCaps(remoteRequest, ActionListener.wrap(response -> { + for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) { + indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware. + buildRemoteIndexName(clusterAlias, res.getIndexName()), res.get())); + } + onResponse.run(); + }, failure -> onResponse.run())); } - } }