diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/repository/RemoteStoreRepositoryRegistrationHelper.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/repository/RemoteStoreRepositoryRegistrationHelper.java new file mode 100644 index 0000000000000..96f438d7cd9b5 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/repository/RemoteStoreRepositoryRegistrationHelper.java @@ -0,0 +1,146 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.repository; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.RepositoriesMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.Settings; +import org.opensearch.repositories.RepositoriesService; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * RemoteStore Repository Registration helper + */ +public class RemoteStoreRepositoryRegistrationHelper { + + public static boolean isRemoteStoreEnabled(ClusterState currentState) { + return currentState.getMetadata().settings().get("cluster.remote_store.segment.repository") != null + && currentState.getMetadata().settings().get("cluster.remote_store.translog.repository") != null; + } + + private static Settings buildSettings(String stringSettings) { + Settings.Builder settings = Settings.builder(); + + String[] stringKeyValue = stringSettings.split(","); + for (int i = 0; i < stringKeyValue.length; i++) { + String[] keyValue = stringKeyValue[i].split(":"); + settings.put(keyValue[0].trim(), keyValue[1].trim()); + } + + return settings.build(); + } + + public static RepositoryMetadata buildSegmentRepositoryMetadata(DiscoveryNode node) { + + String name = node.getAttributes().get("remote_store.segment.repository.name"); + String type = node.getAttributes().get("remote_store.segment.repository.type"); + String stringSettings = node.getAttributes().get("remote_store.segment.repository.settings"); + + assert name != null : "Name cannot be null"; + assert type != null : "Type cannot be null"; + assert stringSettings != null : "Settings cannot be null"; + + return new RepositoryMetadata(name, type, buildSettings(stringSettings)); + + } + + public static RepositoryMetadata buildTranslogRepositoryMetadata(DiscoveryNode node) { + String name = node.getAttributes().get("remote_store.translog.repository.name"); + String type = node.getAttributes().get("remote_store.translog.repository.type"); + String stringSettings = node.getAttributes().get("remote_store.translog.repository.settings"); + + assert name != null : "Name cannot be null"; + assert type != null : "Type cannot be null"; + assert stringSettings != null : "Settings cannot be null"; + + return new RepositoryMetadata(name, type, buildSettings(stringSettings)); + } + + synchronized public static ClusterState validateOrAddRemoteStoreRepository(ClusterState currentState, DiscoveryNode node) { + + assert currentState.getMetadata() + .settings() + .get("cluster.remote_store.segment.repository") + .equals(node.getAttributes().get("remote_store.segment.repository.name")); + assert currentState.getMetadata() + .settings() + .get("cluster.remote_store.translog.repository") + .equals(node.getAttributes().get("remote_store.translog.repository.name")); + + boolean segmentStoreRepositoryFound = false; + boolean translogStoreRepositoryFound = false; + RepositoryMetadata newSegmentRepositoryMetadata = buildSegmentRepositoryMetadata(node); + RepositoryMetadata newTranslogRepositoryMetadata = buildTranslogRepositoryMetadata(node); + + RepositoriesMetadata repositories = currentState.metadata().custom(RepositoriesMetadata.TYPE); + if (repositories != null) { + for (RepositoryMetadata existingRepositoryMetadata : repositories.repositories()) { + if (existingRepositoryMetadata.name().equals(node.getAttributes().get("remote_store.segment.repository.name"))) { + assert existingRepositoryMetadata.equalsIgnoreGenerations(newSegmentRepositoryMetadata); + segmentStoreRepositoryFound = true; + } + if (existingRepositoryMetadata.name().equals(node.getAttributes().get("remote_store.translog.repository.name"))) { + assert existingRepositoryMetadata.equalsIgnoreGenerations(newTranslogRepositoryMetadata); + translogStoreRepositoryFound = true; + } + } + } + + ClusterState.Builder newState = ClusterState.builder(currentState); + + if (!segmentStoreRepositoryFound) { + newState = AddRepositoryInformation(currentState, newSegmentRepositoryMetadata); + currentState = newState.build(); + } + if (!translogStoreRepositoryFound) { + newState = AddRepositoryInformation(currentState, newTranslogRepositoryMetadata); + } + + return newState.build(); + } + + private static ClusterState.Builder AddRepositoryInformation(ClusterState currentState, RepositoryMetadata newRepositoryMetadata) { + RepositoriesService.validate(newRepositoryMetadata.name()); + + Metadata metadata = currentState.metadata(); + Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); + RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE); + if (repositories == null) { + repositories = new RepositoriesMetadata(Collections.singletonList(newRepositoryMetadata)); + } else { + boolean found = false; + List repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1); + + for (RepositoryMetadata repositoryMetadata : repositories.repositories()) { + if (repositoryMetadata.name().equals(newRepositoryMetadata.name())) { + if (newRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)) { + // Previous version is the same as this one no update is needed. + return new ClusterState.Builder(currentState); + } + found = true; + repositoriesMetadata.add(newRepositoryMetadata); + } else { + repositoriesMetadata.add(repositoryMetadata); + } + } + if (!found) { + repositoriesMetadata.add(newRepositoryMetadata); + } + repositories = new RepositoriesMetadata(repositoriesMetadata); + } + mdBuilder.putCustom(RepositoriesMetadata.TYPE, repositories); + return ClusterState.builder(currentState).metadata(mdBuilder); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/repository/package-info.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/repository/package-info.java new file mode 100644 index 0000000000000..a94e80592ef40 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/repository/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Restore remote store transport handler. */ +package org.opensearch.action.admin.cluster.remotestore.repository; diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index de751d881bc0e..f435cb2eb881b 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -605,6 +605,8 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback // we are checking source node commission status here to reject any join request coming from a decommissioned node // even before executing the join task to fail fast JoinTaskExecutor.ensureNodeCommissioned(joinRequest.getSourceNode(), stateForJoinValidation.metadata()); + + JoinTaskExecutor.ensureRemoteStoreNodesCompatibility(joinRequest.getSourceNode(), stateForJoinValidation.metadata()); } sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback); } else { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index 42f09f95a7f56..0176cd5a78db4 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -116,7 +116,7 @@ public class JoinHelper { private final ClusterManagerService clusterManagerService; private final TransportService transportService; - private volatile JoinTaskExecutor joinTaskExecutor; + private volatile RemoteStoreJoinTaskExecutor joinTaskExecutor; private final TimeValue joinTimeout; // only used for Zen1 joining private final NodeHealthService nodeHealthService; @@ -125,7 +125,7 @@ public class JoinHelper { private final AtomicReference lastFailedJoinAttempt = new AtomicReference<>(); - private final Supplier joinTaskExecutorGenerator; + private final Supplier joinTaskExecutorGenerator; private final Consumer nodeCommissioned; private final NamedWriteableRegistry namedWriteableRegistry; private final AtomicReference> serializedState = new AtomicReference<>(); @@ -152,7 +152,7 @@ public class JoinHelper { this.nodeCommissioned = nodeCommissioned; this.namedWriteableRegistry = namedWriteableRegistry; - this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService) { + this.joinTaskExecutorGenerator = () -> new RemoteStoreJoinTaskExecutor(settings, allocationService, logger, rerouteService) { private final long term = currentTermSupplier.getAsLong(); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 626e47108cc63..d192827bd5954 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -41,6 +41,8 @@ import org.opensearch.cluster.decommission.NodeDecommissionedException; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.RepositoriesMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RerouteService; @@ -61,6 +63,8 @@ import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned; import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; +import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.buildSegmentRepositoryMetadata; +import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.buildTranslogRepositoryMetadata; /** * Main executor for Nodes joining the OpenSearch cluster @@ -187,6 +191,8 @@ public ClusterTasksResult execute(ClusterState currentState, List jo // we have added the same check in handleJoinRequest method and adding it here as this method // would guarantee that a decommissioned node would never be able to join the cluster and ensures correctness ensureNodeCommissioned(node, currentState.metadata()); + + ensureRemoteStoreNodesCompatibility(node, currentState.metadata()); nodesBuilder.add(node); nodesChanged = true; minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion()); @@ -422,6 +428,29 @@ public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata) } } + public static void ensureRemoteStoreNodesCompatibility(DiscoveryNode node, Metadata metadata) { + assert metadata.settings() + .get("cluster.remote_store.segment.repository") + .equals(node.getAttributes().get("remote_store.segment.repository.name")); + assert metadata.settings() + .get("cluster.remote_store.translog.repository") + .equals(node.getAttributes().get("remote_store.translog.repository.name")); + + RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE); + if (repositories != null && node != null) { + for (RepositoryMetadata existingRepositoryMetadata : repositories.repositories()) { + if (existingRepositoryMetadata.name().equals(node.getAttributes().get("remote_store.segment.repository.name"))) { + RepositoryMetadata newSegmentRepositoryMetadata = buildSegmentRepositoryMetadata(node); + existingRepositoryMetadata.equalsIgnoreGenerations(newSegmentRepositoryMetadata); + } + if (existingRepositoryMetadata.name().equals(node.getAttributes().get("remote_store.translog.repository.name"))) { + RepositoryMetadata newTranslogRepositoryMetadata = buildTranslogRepositoryMetadata(node); + existingRepositoryMetadata.equalsIgnoreGenerations(newTranslogRepositoryMetadata); + } + } + } + } + public static Collection> addBuiltInJoinValidators( Collection> onJoinValidators ) { @@ -430,6 +459,7 @@ public static Collection> addBuiltInJoin ensureNodesCompatibility(node.getVersion(), state.getNodes()); ensureIndexCompatibility(node.getVersion(), state.getMetadata()); ensureNodeCommissioned(node, state.getMetadata()); + ensureRemoteStoreNodesCompatibility(node, state.metadata()); }); validators.addAll(onJoinValidators); return Collections.unmodifiableCollection(validators); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/RemoteStoreJoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/RemoteStoreJoinTaskExecutor.java new file mode 100644 index 0000000000000..abd1e233890cf --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/coordination/RemoteStoreJoinTaskExecutor.java @@ -0,0 +1,55 @@ +package org.opensearch.cluster.coordination; + +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.RerouteService; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.common.settings.Settings; + +import java.util.List; + +import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.validateOrAddRemoteStoreRepository; +import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.isRemoteStoreEnabled; + +/** + * Main executor for Remote Store Nodes joining the OpenSearch cluster + * + * @opensearch.internal + */ +public class RemoteStoreJoinTaskExecutor extends JoinTaskExecutor { + public RemoteStoreJoinTaskExecutor( + Settings settings, + AllocationService allocationService, + Logger logger, + RerouteService rerouteService + ) { + super(settings, allocationService, logger, rerouteService); + } + + // TODO: Add verifyRepositoryListener to validate if all the nodes are aware of the repository. + @Override + public ClusterTasksResult execute(ClusterState currentState, List joiningNodes) throws Exception { + ClusterState intermediateState, newState = currentState; + + if (isRemoteStoreEnabled(currentState)) { + for (final Task joinTask : joiningNodes) { + if (joinTask.isBecomeClusterManagerTask() || joinTask.isFinishElectionTask()) { + // noop + } else if (currentState.nodes().nodeExistsWithSameRoles(joinTask.node())) { + // noop + } else { + final DiscoveryNode node = joinTask.node(); + intermediateState = validateOrAddRemoteStoreRepository(newState, node); + newState = intermediateState; + } + } + } + return super.execute(newState, joiningNodes); + } + + /** + * Questions - + * + */ +} diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index e7f7a1d9c0554..28fd15d8caa36 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -598,7 +598,7 @@ private Repository createRepository(RepositoryMetadata repositoryMetadata, Map