Skip to content

Commit

Permalink
[Remote Store] Changes to introduce repository registration via node …
Browse files Browse the repository at this point in the history
…attributes

Signed-off-by: Dharmesh 💤 <sdharms@amazon.com>
  • Loading branch information
psychbot committed Aug 17, 2023
1 parent 6a5b464 commit 5ada3cd
Show file tree
Hide file tree
Showing 6 changed files with 694 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.remotestore.repository;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
import org.opensearch.repositories.RepositoriesService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* RemoteStore Repository Registration helper
*/
public class RemoteStoreRepositoryRegistrationHelper {

public static final String REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY = "cluster.remote_store.segment";
public static final String REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY = "cluster.remote_store.translog";
public static final String REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT = "remote_store.repository.%s.type";
public static final String REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT = "remote_store.repository.%s.settings";

private static void validateAttributeNonNull(DiscoveryNode joiningNode, String attributeKey) {
String attributeValue = joiningNode.getAttributes().get(attributeKey);
if (attributeValue == null || attributeValue.isEmpty()) {
throw new IllegalStateException("joining node [" + joiningNode + "] doesn't have the node attribute [" + attributeKey + "]");
}
}

/**
* A node will be declared as remote store node if it has any of the remote store node attributes.
* The method validates that the joining node has any of the remote store node attributes or not.
* @param joiningNode
* @return boolean value on the basis of remote store node attributes.
*/
public static boolean isRemoteStoreNode(DiscoveryNode joiningNode) {
Map<String, String> joiningNodeAttributes = joiningNode.getAttributes();
String segmentRepositoryName = joiningNodeAttributes.get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
String segmentRepositoryTypeAttributeKey = String.format(REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, segmentRepositoryName);
String segmentRepositorySettingsAttributeKey = String.format(
REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT,
segmentRepositoryName
);
String translogRepositoryName = joiningNodeAttributes.get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
String translogRepositoryTypeAttributeKey = String.format(
REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
translogRepositoryName
);
String translogRepositorySettingsAttributeKey = String.format(
REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT,
translogRepositoryName
);

boolean remoteStoreNode = joiningNodeAttributes.get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY) != null
|| joiningNodeAttributes.get(segmentRepositoryTypeAttributeKey) != null
|| joiningNodeAttributes.get(segmentRepositorySettingsAttributeKey) != null
|| joiningNodeAttributes.get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY) != null
|| joiningNodeAttributes.get(translogRepositoryTypeAttributeKey) != null
|| joiningNodeAttributes.get(translogRepositorySettingsAttributeKey) != null;

if (remoteStoreNode) {
validateAttributeNonNull(joiningNode, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
validateAttributeNonNull(joiningNode, segmentRepositoryTypeAttributeKey);
validateAttributeNonNull(joiningNode, segmentRepositorySettingsAttributeKey);
validateAttributeNonNull(joiningNode, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
validateAttributeNonNull(joiningNode, translogRepositoryTypeAttributeKey);
validateAttributeNonNull(joiningNode, translogRepositorySettingsAttributeKey);
}

return remoteStoreNode;
}

private static void compareAttribute(DiscoveryNode joiningNode, DiscoveryNode existingNode, String attributeKey) {
String joiningNodeAttribute = joiningNode.getAttributes().get(attributeKey);
String existingNodeAttribute = existingNode.getAttributes().get(attributeKey);

if (existingNodeAttribute.equals(joiningNodeAttribute) == false) {
throw new IllegalStateException(
"joining node ["
+ joiningNode
+ "] has node attribute ["
+ attributeKey
+ "] value ["
+ joiningNodeAttribute
+ "] which is different than existing node ["
+ existingNode
+ "] value ["
+ existingNodeAttribute
+ "]"
);
}
}

// TODO: See a better way to compare the remote store node attributes.
public static void compareNodeAttributes(DiscoveryNode joiningNode, DiscoveryNode existingNode) {
String segmentRepositoryName = existingNode.getAttributes().get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
String translogRepositoryName = existingNode.getAttributes().get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);

compareAttribute(joiningNode, existingNode, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
compareAttribute(
joiningNode,
existingNode,
String.format(REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, segmentRepositoryName)
);
compareAttribute(
joiningNode,
existingNode,
String.format(REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT, segmentRepositoryName)
);
compareAttribute(joiningNode, existingNode, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
compareAttribute(
joiningNode,
existingNode,
String.format(REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, translogRepositoryName)
);
compareAttribute(
joiningNode,
existingNode,
String.format(REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT, translogRepositoryName)
);
}

private static Settings buildSettings(String stringSettings) {
Settings.Builder settings = Settings.builder();

String[] stringKeyValue = stringSettings.split(",");
for (int i = 0; i < stringKeyValue.length; i++) {
String[] keyValue = stringKeyValue[i].split(":");
settings.put(keyValue[0].trim(), keyValue[1].trim());
}

return settings.build();
}

// TODO: Add logic to mark these repository as System Repository once thats merged.
// Visible For testing
public static RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) {
String type = node.getAttributes().get(String.format(REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name));
String settings = node.getAttributes().get(String.format(REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT, name));

validateAttributeNonNull(node, String.format(REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name));
validateAttributeNonNull(node, String.format(REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT, name));

return new RepositoryMetadata(name, type, buildSettings(settings));

}

/**
* Validated or adds the remote store repository to cluster state if it doesn't exist.
* @param joiningNode
* @param currentState
* @return updated cluster state
*/
public static ClusterState validateOrAddRemoteStoreRepository(DiscoveryNode joiningNode, ClusterState currentState) {
List<DiscoveryNode> existingNodes = new ArrayList<>(currentState.getNodes().getNodes().values());

ClusterState newState = ClusterState.builder(currentState).build();

// TODO: Mutating cluster state like this can be dangerous, this will need refactoring.
if (existingNodes.size() == 0) {
validateAttributeNonNull(joiningNode, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
validateAttributeNonNull(joiningNode, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);

newState = updateClusterStateWithRepositoryMetadata(
currentState,
buildRepositoryMetadata(joiningNode, joiningNode.getAttributes().get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY))
);
newState = updateClusterStateWithRepositoryMetadata(
newState,
buildRepositoryMetadata(joiningNode, joiningNode.getAttributes().get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY))
);
return newState;
} else {
compareNodeAttributes(joiningNode, existingNodes.get(0));
}

return newState;
}

private static ClusterState updateClusterStateWithRepositoryMetadata(
ClusterState currentState,
RepositoryMetadata newRepositoryMetadata
) {
RepositoriesService.validate(newRepositoryMetadata.name());

Metadata metadata = currentState.metadata();
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE);
if (repositories == null) {
repositories = new RepositoriesMetadata(Collections.singletonList(newRepositoryMetadata));
} else {
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1);

for (RepositoryMetadata repositoryMetadata : repositories.repositories()) {
if (repositoryMetadata.name().equals(newRepositoryMetadata.name())) {
if (newRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)) {
// Previous version is the same as this one no update is needed.
return new ClusterState.Builder(currentState).build();
} else {
throw new IllegalStateException(
"new repository metadata ["
+ newRepositoryMetadata
+ "] supplied by joining node is different from existing repository metadata ["
+ repositoryMetadata
+ "]"
);
}
} else {
repositoriesMetadata.add(repositoryMetadata);
}
}
repositoriesMetadata.add(newRepositoryMetadata);
repositories = new RepositoriesMetadata(repositoriesMetadata);
}
mdBuilder.putCustom(RepositoriesMetadata.TYPE, repositories);
return ClusterState.builder(currentState).metadata(mdBuilder).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** Restore remote store transport handler. */
package org.opensearch.action.admin.cluster.remotestore.repository;
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,8 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback
// we are checking source node commission status here to reject any join request coming from a decommissioned node
// even before executing the join task to fail fast
JoinTaskExecutor.ensureNodeCommissioned(joinRequest.getSourceNode(), stateForJoinValidation.metadata());

JoinTaskExecutor.ensureRemoteStoreNodesCompatibility(joinRequest.getSourceNode(), stateForJoinValidation);
}
sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.compareNodeAttributes;
import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.isRemoteStoreNode;
import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.validateOrAddRemoteStoreRepository;
import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;

Expand Down Expand Up @@ -140,6 +143,11 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
ClusterState.Builder newState;

if (joiningNodes.size() == 1 && joiningNodes.get(0).isFinishElectionTask()) {
DiscoveryNode joiningNode = joiningNodes.get(0).node();
if (isRemoteStoreNode(joiningNode)) {
// TODO: Mutating cluster state like this can be dangerous, this will need refactoring.
currentState = validateOrAddRemoteStoreRepository(joiningNode, currentState);
}
return results.successes(joiningNodes).build(currentState);
} else if (currentNodes.getClusterManagerNode() == null && joiningNodes.stream().anyMatch(Task::isBecomeClusterManagerTask)) {
assert joiningNodes.stream().anyMatch(Task::isFinishElectionTask) : "becoming a cluster-manager but election is not finished "
Expand All @@ -160,6 +168,7 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
}

DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes());
ClusterState intermediateState;

assert nodesBuilder.isLocalNodeElectedClusterManager();

Expand All @@ -176,6 +185,12 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
logger.debug("received a join request for an existing node [{}]", joinTask.node());
} else {
final DiscoveryNode node = joinTask.node();
if (isRemoteStoreNode(node)) {
// TODO: Mutating cluster state like this can be dangerous or anti pattern, this will need
// refactoring.
intermediateState = validateOrAddRemoteStoreRepository(node, newState.build());
newState = ClusterState.builder(intermediateState);
}
try {
if (enforceMajorVersion) {
ensureMajorVersionBarrier(node.getVersion(), minClusterNodeVersion);
Expand All @@ -187,6 +202,8 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
// we have added the same check in handleJoinRequest method and adding it here as this method
// would guarantee that a decommissioned node would never be able to join the cluster and ensures correctness
ensureNodeCommissioned(node, currentState.metadata());

ensureRemoteStoreNodesCompatibility(node, currentState);
nodesBuilder.add(node);
nodesChanged = true;
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
Expand Down Expand Up @@ -422,6 +439,45 @@ public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata)
}
}

/**
* The method ensures two conditions -
* 1. The joining node is remote store if it is joining a remote store cluster.
* 2. The joining node is non-remote store if it is joining a non-remote store cluster.
* A remote store node is the one which holds the all the remote store attributes and a remote store cluster is
* the one which has only homogeneous remote store nodes with same node attributes
*
* @param joiningNode
* @param currentState
*/
public static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, ClusterState currentState) {
List<DiscoveryNode> existingNodes = new ArrayList<>(currentState.getNodes().getNodes().values());

/**
* If there are no node in the cluster state we will No op the compatibility check as at this point we
* cannot determine if this is a remote store cluster or non-remote store cluster.
*/
if (existingNodes.size() == 0) {
return;
}

/**
* TODO: The below check is valid till we support migration, once we start supporting migration a remote
* store node will be able to join a non remote store cluster and vice versa. #7986
*/
if (isRemoteStoreNode(joiningNode)) {
if (isRemoteStoreNode(existingNodes.get(0))) {
DiscoveryNode existingNode = existingNodes.get(0);
compareNodeAttributes(joiningNode, existingNode);
} else {
throw new IllegalStateException("a remote store node [" + joiningNode + "] is trying to join a non remote store cluster");
}
} else {
if (isRemoteStoreNode(existingNodes.get(0))) {
throw new IllegalStateException("a non remote store node [" + joiningNode + "] is trying to join a remote store cluster");
}
}
}

public static Collection<BiConsumer<DiscoveryNode, ClusterState>> addBuiltInJoinValidators(
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators
) {
Expand All @@ -430,6 +486,7 @@ public static Collection<BiConsumer<DiscoveryNode, ClusterState>> addBuiltInJoin
ensureNodesCompatibility(node.getVersion(), state.getNodes());
ensureIndexCompatibility(node.getVersion(), state.getMetadata());
ensureNodeCommissioned(node, state.getMetadata());
ensureRemoteStoreNodesCompatibility(node, state);
});
validators.addAll(onJoinValidators);
return Collections.unmodifiableCollection(validators);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ private Repository createRepository(RepositoryMetadata repositoryMetadata, Map<S
}
}

private static void validate(final String repositoryName) {
public static void validate(final String repositoryName) {
if (Strings.hasLength(repositoryName) == false) {
throw new RepositoryException(repositoryName, "cannot be empty");
}
Expand Down
Loading

0 comments on commit 5ada3cd

Please sign in to comment.