Skip to content

Commit

Permalink
[Feature/extensions] Integrated CreateComponent extensionPoint (#3265)
Browse files Browse the repository at this point in the history
* Draft createComponent extensionPoint

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

* Integrated cluster state for createComponent

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

* Decoupled extension points design

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

* Changed ClusterServiceRequest to generic ExtensionRequest

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

* PR comments

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

* Using ClusterStateResponse

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

* Rebased

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>
  • Loading branch information
owaiskazi19 authored Jun 27, 2022
1 parent 7625fce commit 76bbc9f
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(waitForTimedOut);
}

@Override
public String toString() {
return "ClusterStateResponse{" + "clusterState=" + clusterState + '}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.settings.Settings;
import org.opensearch.transport.TransportResponse;

import java.io.IOException;
import java.util.Objects;

/**
* PluginSettings Response for Extensibility
*
* @opensearch.internal
*/
public class ClusterSettingsResponse extends TransportResponse {
private final Settings clusterSettings;

public ClusterSettingsResponse(ClusterService clusterService) {
this.clusterSettings = clusterService.getSettings();
}

public ClusterSettingsResponse(StreamInput in) throws IOException {
super(in);
this.clusterSettings = Settings.readSettingsFromStream(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
Settings.writeSettingsToStream(clusterSettings, out);
}

@Override
public String toString() {
return "ClusterSettingsResponse{" + "clusterSettings=" + clusterSettings + '}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClusterSettingsResponse that = (ClusterSettingsResponse) o;
return Objects.equals(clusterSettings, that.clusterSettings);
}

@Override
public int hashCode() {
return Objects.hash(clusterSettings);
}

}
60 changes: 60 additions & 0 deletions server/src/main/java/org/opensearch/cluster/LocalNodeResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.transport.TransportResponse;

import java.io.IOException;
import java.util.Objects;

/**
* LocalNode Response for Extensibility
*
* @opensearch.internal
*/
public class LocalNodeResponse extends TransportResponse {
private final DiscoveryNode localNode;

public LocalNodeResponse(ClusterService clusterService) {
this.localNode = clusterService.localNode();
}

public LocalNodeResponse(StreamInput in) throws IOException {
super(in);
this.localNode = new DiscoveryNode(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
this.localNode.writeTo(out);
}

@Override
public String toString() {
return "LocalNodeResponse{" + "localNode=" + localNode + '}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LocalNodeResponse that = (LocalNodeResponse) o;
return Objects.equals(localNode, that.localNode);
}

@Override
public int hashCode() {
return Objects.hash(localNode);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.transport.TransportRequest;

import java.io.IOException;
import java.util.Objects;

/**
* CLusterService Request for Extensibility
*
* @opensearch.internal
*/
public class ExtensionRequest extends TransportRequest {
private static final Logger logger = LogManager.getLogger(ExtensionRequest.class);
private ExtensionsOrchestrator.RequestType requestType;

public ExtensionRequest(ExtensionsOrchestrator.RequestType requestType) {
this.requestType = requestType;
}

public ExtensionRequest(StreamInput in) throws IOException {
super(in);
this.requestType = in.readEnum(ExtensionsOrchestrator.RequestType.class);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeEnum(requestType);
}

public ExtensionsOrchestrator.RequestType getRequestType() {
return this.requestType;
}

public String toString() {
return "ExtensionRequest{" + "requestType=" + requestType + '}';
}

@Override
public boolean equals(Object o) {

if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ExtensionRequest that = (ExtensionRequest) o;
return Objects.equals(requestType, that.requestType);
}

@Override
public int hashCode() {
return Objects.hash(requestType);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.cluster.*;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.FileSystemUtils;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Settings;
Expand All @@ -49,6 +52,7 @@
import org.opensearch.plugins.PluginsService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

Expand All @@ -61,20 +65,39 @@ public class ExtensionsOrchestrator implements ReportingService<PluginsAndModule
public static final String REQUEST_EXTENSION_ACTION_NAME = "internal:discovery/extensions";
public static final String INDICES_EXTENSION_POINT_ACTION_NAME = "indices:internal/extensions";
public static final String INDICES_EXTENSION_NAME_ACTION_NAME = "indices:internal/name";
public static final String REQUEST_EXTENSION_CLUSTER_STATE = "internal:discovery/clusterstate";
public static final String REQUEST_EXTENSION_LOCAL_NODE = "internal:discovery/localnode";
public static final String REQUEST_EXTENSION_CLUSTER_SETTINGS = "internal:discovery/clustersettings";

private static final Logger logger = LogManager.getLogger(ExtensionsOrchestrator.class);

/**
* Enum for Extension Requests
*
* @opensearch.internal
*/
public static enum RequestType {
REQUEST_EXTENSION_CLUSTER_STATE,
REQUEST_EXTENSION_LOCAL_NODE,
REQUEST_EXTENSION_CLUSTER_SETTINGS,
CREATE_COMPONENT,
ON_INDEX_MODULE,
GET_SETTINGS
};

private final Path extensionsPath;
final Set<DiscoveryExtension> extensionsSet;
Set<DiscoveryExtension> extensionsInitializedSet;
TransportService transportService;
ClusterService clusterService;

public ExtensionsOrchestrator(Settings settings, Path extensionsPath) throws IOException {
logger.info("ExtensionsOrchestrator initialized");
this.extensionsPath = extensionsPath;
this.transportService = null;
this.extensionsSet = new HashSet<DiscoveryExtension>();
this.extensionsInitializedSet = new HashSet<DiscoveryExtension>();

this.clusterService = null;
/*
* Now Discover extensions
*/
Expand All @@ -86,6 +109,34 @@ public void setTransportService(TransportService transportService) {
this.transportService = transportService;
}

public void setClusterService(ClusterService clusterService) {
this.clusterService = clusterService;
transportService.registerRequestHandler(
REQUEST_EXTENSION_CLUSTER_STATE,
ThreadPool.Names.GENERIC,
false,
false,
ExtensionRequest::new,
((request, channel, task) -> channel.sendResponse(handleExtensionRequest(request)))
);
transportService.registerRequestHandler(
REQUEST_EXTENSION_LOCAL_NODE,
ThreadPool.Names.GENERIC,
false,
false,
ExtensionRequest::new,
((request, channel, task) -> channel.sendResponse(handleExtensionRequest(request)))
);
transportService.registerRequestHandler(
REQUEST_EXTENSION_CLUSTER_SETTINGS,
ThreadPool.Names.GENERIC,
false,
false,
ExtensionRequest::new,
((request, channel, task) -> channel.sendResponse(handleExtensionRequest(request)))
);
}

@Override
public PluginsAndModules info() {
return null;
Expand Down Expand Up @@ -187,14 +238,33 @@ public String executor() {
transportService.sendRequest(
extensionNode,
REQUEST_EXTENSION_ACTION_NAME,
new PluginRequest(extensionNode, new ArrayList<DiscoveryExtension>(extensionsSet)),
new PluginRequest(transportService.getLocalNode(), new ArrayList<DiscoveryExtension>(extensionsSet)),
pluginResponseHandler
);
} catch (Exception e) {
logger.error(e.toString());
}
}

TransportResponse handleExtensionRequest(ExtensionRequest extensionRequest) {
// Read enum
if (extensionRequest.getRequestType() == RequestType.REQUEST_EXTENSION_CLUSTER_STATE) {
ClusterStateResponse clusterStateResponse = new ClusterStateResponse(
clusterService.getClusterName(),
clusterService.state(),
false
);
return clusterStateResponse;
} else if (extensionRequest.getRequestType() == RequestType.REQUEST_EXTENSION_LOCAL_NODE) {
LocalNodeResponse localNodeResponse = new LocalNodeResponse(clusterService);
return localNodeResponse;
} else if (extensionRequest.getRequestType() == RequestType.REQUEST_EXTENSION_CLUSTER_SETTINGS) {
ClusterSettingsResponse clusterSettingsResponse = new ClusterSettingsResponse(clusterService);
return clusterSettingsResponse;
}
return null;
}

public void onIndexModule(IndexModule indexModule) throws UnknownHostException {
for (DiscoveryNode extensionNode : extensionsSet) {
onIndexModule(indexModule, extensionNode);
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,7 @@ protected Node(
* This seems like a chicken and egg problem.
*/
this.extensionsOrchestrator.setTransportService(transportService);
this.extensionsOrchestrator.setClusterService(clusterService);
final GatewayMetaState gatewayMetaState = new GatewayMetaState();
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
final SearchTransportService searchTransportService = new SearchTransportService(
Expand Down Expand Up @@ -1085,7 +1086,6 @@ public Node start() throws NodeValidationException {
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
: "transportService has a different local node than the factory provided";
injector.getInstance(PeerRecoverySourceService.class).start();
extensionsOrchestrator.extensionsInitialize();

// Load (and maybe upgrade) the metadata stored on disk
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
Expand Down Expand Up @@ -1130,6 +1130,7 @@ public Node start() throws NodeValidationException {
assert clusterService.localNode().equals(localNodeFactory.getNode())
: "clusterService has a different local node than the factory provided";
transportService.acceptIncomingRequests();
extensionsOrchestrator.extensionsInitialize();
discovery.startInitialJoin();
final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings());
configureNodeAndClusterIdStateListener(clusterService);
Expand Down

0 comments on commit 76bbc9f

Please sign in to comment.