Skip to content

Commit

Permalink
Use remote client in TransportFieldCapsAction (#30838)
Browse files Browse the repository at this point in the history
We now have a remote cluster client exposed which can
talk to a given remote cluster and manages reconnects etc.
This makes code more readable than using the transport layer directly.
  • Loading branch information
s1monw authored May 24, 2018
1 parent d32683e commit cad531e
Showing 1 changed file with 14 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -33,10 +34,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
Expand All @@ -49,7 +46,6 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
private final ClusterService clusterService;
private final TransportFieldCapabilitiesIndexAction shardAction;
private final RemoteClusterService remoteClusterService;
private final TransportService transportService;

@Inject
public TransportFieldCapabilitiesAction(Settings settings, TransportService transportService,
Expand All @@ -62,7 +58,6 @@ public TransportFieldCapabilitiesAction(Settings settings, TransportService tran
actionFilters, indexNameExpressionResolver, FieldCapabilitiesRequest::new);
this.clusterService = clusterService;
this.remoteClusterService = transportService.getRemoteClusterService();
this.transportService = transportService;
this.shardAction = shardAction;
}

Expand Down Expand Up @@ -118,47 +113,20 @@ public void onFailure(Exception e) {
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
String clusterAlias = remoteIndices.getKey();
OriginalIndices originalIndices = remoteIndices.getValue();
// if we are connected this is basically a no-op, if we are not we try to connect in parallel in a non-blocking fashion
remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(v -> {
Transport.Connection connection = remoteClusterService.getConnection(clusterAlias);
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
remoteRequest.setMergeResults(false); // we need to merge on this node
remoteRequest.indicesOptions(originalIndices.indicesOptions());
remoteRequest.indices(originalIndices.indices());
remoteRequest.fields(request.fields());
transportService.sendRequest(connection, FieldCapabilitiesAction.NAME, remoteRequest, TransportRequestOptions.EMPTY,
new TransportResponseHandler<FieldCapabilitiesResponse>() {

@Override
public FieldCapabilitiesResponse newInstance() {
return new FieldCapabilitiesResponse();
}

@Override
public void handleResponse(FieldCapabilitiesResponse response) {
try {
for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) {
indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware.
buildRemoteIndexName(clusterAlias, res.getIndexName()), res.get()));
}
} finally {
onResponse.run();
}
}

@Override
public void handleException(TransportException exp) {
onResponse.run();
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}, e -> onResponse.run()));
Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
remoteRequest.setMergeResults(false); // we need to merge on this node
remoteRequest.indicesOptions(originalIndices.indicesOptions());
remoteRequest.indices(originalIndices.indices());
remoteRequest.fields(request.fields());
remoteClusterClient.fieldCaps(remoteRequest, ActionListener.wrap(response -> {
for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) {
indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware.
buildRemoteIndexName(clusterAlias, res.getIndexName()), res.get()));
}
onResponse.run();
}, failure -> onResponse.run()));
}

}
}

Expand Down

0 comments on commit cad531e

Please sign in to comment.