Skip to content

Commit

Permalink
Updating code and adding unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
psychbot committed Aug 15, 2023
1 parent 77af2c5 commit 0442656
Show file tree
Hide file tree
Showing 3 changed files with 432 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public class RemoteStoreRepositoryRegistrationHelper {
public static final String REMOTE_STORE_TRANSLOG_REPOSITORY_TYPE_ATTRIBUTE_KEY = "remote_store.translog.repository.type";
public static final String REMOTE_STORE_TRANSLOG_REPOSITORY_SETTINGS_ATTRIBUTE_KEY = "remote_store.translog.repository.settings";

private static void validateAttributeNonNull(DiscoveryNode joiningNode, String attributeKey) {
if (joiningNode.getAttributes().get(attributeKey) == null) {
throw new IllegalStateException("joining node [" + joiningNode + "] doesn't have the node attribute [" + attributeKey + "]");
}
}

/**
* A node will be declared as remote store node if it has any of the remote store node attributes.
* The method validates that the joining node has any of the remote store node attributes or not.
Expand All @@ -49,144 +55,48 @@ public static boolean isRemoteStoreNode(DiscoveryNode joiningNode) {
|| joiningNodeAttributes.get(REMOTE_STORE_TRANSLOG_REPOSITORY_SETTINGS_ATTRIBUTE_KEY) != null;

if (remoteStoreNode) {
if (joiningNodeAttributes.get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY) != null) {
throw new IllegalStateException(
"joining node ["
+ joiningNode
+ "] doesn't have the node attribute ["
+ REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY
+ "]"
);
}
if (joiningNodeAttributes.get(REMOTE_STORE_SEGMENT_REPOSITORY_TYPE_ATTRIBUTE_KEY) != null) {
throw new IllegalStateException(
"joining node ["
+ joiningNode
+ "] doesn't have the node attribute ["
+ REMOTE_STORE_SEGMENT_REPOSITORY_TYPE_ATTRIBUTE_KEY
+ "]"
);
}
if (joiningNodeAttributes.get(REMOTE_STORE_SEGMENT_REPOSITORY_SETTINGS_ATTRIBUTE_KEY) != null) {
throw new IllegalStateException(
"joining node ["
+ joiningNode
+ "] doesn't have the node attribute ["
+ REMOTE_STORE_SEGMENT_REPOSITORY_SETTINGS_ATTRIBUTE_KEY
+ "]"
);
}
if (joiningNodeAttributes.get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY) != null) {
throw new IllegalStateException(
"joining node ["
+ joiningNode
+ "] doesn't have the node attribute ["
+ REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY
+ "]"
);
}
if (joiningNodeAttributes.get(REMOTE_STORE_TRANSLOG_REPOSITORY_TYPE_ATTRIBUTE_KEY) != null) {
throw new IllegalStateException(
"joining node ["
+ joiningNode
+ "] doesn't have the node attribute ["
+ REMOTE_STORE_TRANSLOG_REPOSITORY_TYPE_ATTRIBUTE_KEY
+ "]"
);
}
if (joiningNodeAttributes.get(REMOTE_STORE_TRANSLOG_REPOSITORY_SETTINGS_ATTRIBUTE_KEY) != null) {
throw new IllegalStateException(
"joining node ["
+ joiningNode
+ "] doesn't have the node attribute ["
+ REMOTE_STORE_TRANSLOG_REPOSITORY_SETTINGS_ATTRIBUTE_KEY
+ "]"
);
}
validateAttributeNonNull(joiningNode, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
validateAttributeNonNull(joiningNode, REMOTE_STORE_SEGMENT_REPOSITORY_TYPE_ATTRIBUTE_KEY);
validateAttributeNonNull(joiningNode, REMOTE_STORE_SEGMENT_REPOSITORY_SETTINGS_ATTRIBUTE_KEY);
validateAttributeNonNull(joiningNode, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
validateAttributeNonNull(joiningNode, REMOTE_STORE_TRANSLOG_REPOSITORY_TYPE_ATTRIBUTE_KEY);
validateAttributeNonNull(joiningNode, REMOTE_STORE_TRANSLOG_REPOSITORY_SETTINGS_ATTRIBUTE_KEY);
}

return remoteStoreNode;
}

// TODO: See a better way to compare the remote store node attributes.
public static void validateNodeAttributes(DiscoveryNode joiningNode, DiscoveryNode existingNode) {
Map<String, String> joiningNodeAttributes = joiningNode.getAttributes();
Map<String, String> existingNodeAttributes = existingNode.getAttributes();
private static void compareAttribute(DiscoveryNode joiningNode, DiscoveryNode existingNode, String attributeKey) {
String joiningNodeAttribute = joiningNode.getAttributes().get(attributeKey);
String existingNodeAttribute = existingNode.getAttributes().get(attributeKey);

if (!existingNodeAttributes.get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY)
.equals(joiningNodeAttributes.get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY))) {
throw new IllegalStateException(
"joining node ["
+ joiningNode
+ "] has node attribute ["
+ REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY
+ "] different than existing node ["
+ existingNode
+ "]"
);
}
if (!existingNodeAttributes.get(REMOTE_STORE_SEGMENT_REPOSITORY_TYPE_ATTRIBUTE_KEY)
.equals(joiningNodeAttributes.get(REMOTE_STORE_SEGMENT_REPOSITORY_TYPE_ATTRIBUTE_KEY))) {
throw new IllegalStateException(
"joining node ["
+ joiningNode
+ "] has node attribute ["
+ REMOTE_STORE_SEGMENT_REPOSITORY_TYPE_ATTRIBUTE_KEY
+ "] different than existing node ["
+ existingNode
+ "]"
);
}
if (!existingNodeAttributes.get(REMOTE_STORE_SEGMENT_REPOSITORY_SETTINGS_ATTRIBUTE_KEY)
.equals(joiningNodeAttributes.get(REMOTE_STORE_SEGMENT_REPOSITORY_SETTINGS_ATTRIBUTE_KEY))) {
if (existingNodeAttribute.equals(joiningNodeAttribute) == false) {
throw new IllegalStateException(
"joining node ["
+ joiningNode
+ "] has node attribute ["
+ REMOTE_STORE_SEGMENT_REPOSITORY_SETTINGS_ATTRIBUTE_KEY
+ "] different than existing node ["
+ existingNode
+ "]"
);
}
if (!existingNodeAttributes.get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY)
.equals(joiningNodeAttributes.get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY))) {
throw new IllegalStateException(
"joining node ["
+ joiningNode
+ "] has node attribute ["
+ REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY
+ "] different than existing node ["
+ existingNode
+ "]"
);
}
if (!existingNodeAttributes.get(REMOTE_STORE_TRANSLOG_REPOSITORY_TYPE_ATTRIBUTE_KEY)
.equals(joiningNodeAttributes.get(REMOTE_STORE_TRANSLOG_REPOSITORY_TYPE_ATTRIBUTE_KEY))) {
throw new IllegalStateException(
"joining node ["
+ joiningNode
+ "] has node attribute ["
+ REMOTE_STORE_TRANSLOG_REPOSITORY_TYPE_ATTRIBUTE_KEY
+ "] different than existing node ["
+ existingNode
+ "]"
);
}
if (!existingNodeAttributes.get(REMOTE_STORE_TRANSLOG_REPOSITORY_SETTINGS_ATTRIBUTE_KEY)
.equals(joiningNodeAttributes.get(REMOTE_STORE_TRANSLOG_REPOSITORY_SETTINGS_ATTRIBUTE_KEY))) {
throw new IllegalStateException(
"joining node ["
+ joiningNode
+ "] has node attribute ["
+ REMOTE_STORE_TRANSLOG_REPOSITORY_SETTINGS_ATTRIBUTE_KEY
+ "] different than existing node ["
+ attributeKey
+ "] value ["
+ joiningNodeAttribute
+ "] which is different than existing node ["
+ existingNode
+ "] value ["
+ existingNodeAttribute
+ "]"
);
}
}

// TODO: See a better way to compare the remote store node attributes.
public static void compareNodeAttributes(DiscoveryNode joiningNode, DiscoveryNode existingNode) {
compareAttribute(joiningNode, existingNode, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
compareAttribute(joiningNode, existingNode, REMOTE_STORE_SEGMENT_REPOSITORY_TYPE_ATTRIBUTE_KEY);
compareAttribute(joiningNode, existingNode, REMOTE_STORE_SEGMENT_REPOSITORY_SETTINGS_ATTRIBUTE_KEY);
compareAttribute(joiningNode, existingNode, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
compareAttribute(joiningNode, existingNode, REMOTE_STORE_TRANSLOG_REPOSITORY_TYPE_ATTRIBUTE_KEY);
compareAttribute(joiningNode, existingNode, REMOTE_STORE_TRANSLOG_REPOSITORY_SETTINGS_ATTRIBUTE_KEY);
}

private static Settings buildSettings(String stringSettings) {
Settings.Builder settings = Settings.builder();

Expand All @@ -199,7 +109,9 @@ private static Settings buildSettings(String stringSettings) {
return settings.build();
}

private static RepositoryMetadata buildSegmentRepositoryMetadata(DiscoveryNode node) {
// TODO: Add logic to mark these repository as System Repository once thats merged.
// Visible For testing
public static RepositoryMetadata buildSegmentRepositoryMetadata(DiscoveryNode node) {
String name = node.getAttributes().get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
String type = node.getAttributes().get(REMOTE_STORE_SEGMENT_REPOSITORY_TYPE_ATTRIBUTE_KEY);
String stringSettings = node.getAttributes().get(REMOTE_STORE_SEGMENT_REPOSITORY_SETTINGS_ATTRIBUTE_KEY);
Expand All @@ -212,7 +124,9 @@ private static RepositoryMetadata buildSegmentRepositoryMetadata(DiscoveryNode n

}

private static RepositoryMetadata buildTranslogRepositoryMetadata(DiscoveryNode node) {
// TODO: Add logic to mark these repository as System Repository once thats merged.
// Visible For testing
public static RepositoryMetadata buildTranslogRepositoryMetadata(DiscoveryNode node) {
String name = node.getAttributes().get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
String type = node.getAttributes().get(REMOTE_STORE_TRANSLOG_REPOSITORY_TYPE_ATTRIBUTE_KEY);
String stringSettings = node.getAttributes().get(REMOTE_STORE_TRANSLOG_REPOSITORY_SETTINGS_ATTRIBUTE_KEY);
Expand All @@ -237,13 +151,11 @@ public static ClusterState validateOrAddRemoteStoreRepository(DiscoveryNode join

// TODO: Mutating cluster state like this can be dangerous, this will need refactoring.
if (existingNodes.size() == 0) {
// TODO: Will need to add the node to cluster state once the repository metadata is updated, such
// that subsequent joining nodes attributes gets validated against the existing nodes.
newState = updateClusterStateWithRepositoryMetadata(currentState, buildSegmentRepositoryMetadata(joiningNode));
newState = updateClusterStateWithRepositoryMetadata(newState, buildTranslogRepositoryMetadata(joiningNode));
return newState;
} else {
validateNodeAttributes(joiningNode, existingNodes.get(0));
compareNodeAttributes(joiningNode, existingNodes.get(0));
}

return newState;
Expand All @@ -261,24 +173,27 @@ private static ClusterState updateClusterStateWithRepositoryMetadata(
if (repositories == null) {
repositories = new RepositoriesMetadata(Collections.singletonList(newRepositoryMetadata));
} else {
boolean found = false;
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1);

for (RepositoryMetadata repositoryMetadata : repositories.repositories()) {
if (repositoryMetadata.name().equals(newRepositoryMetadata.name())) {
if (newRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)) {
// Previous version is the same as this one no update is needed.
return new ClusterState.Builder(currentState).build();
} else {
throw new IllegalStateException(
"new repository metadata ["
+ newRepositoryMetadata
+ "] supplied by joining node is different from existing repository metadata ["
+ repositoryMetadata
+ "]"
);
}
found = true;
repositoriesMetadata.add(newRepositoryMetadata);
} else {
repositoriesMetadata.add(repositoryMetadata);
}
}
if (!found) {
repositoriesMetadata.add(newRepositoryMetadata);
}
repositoriesMetadata.add(newRepositoryMetadata);
repositories = new RepositoriesMetadata(repositoriesMetadata);
}
mdBuilder.putCustom(RepositoriesMetadata.TYPE, repositories);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import java.util.stream.Collectors;

import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.isRemoteStoreNode;
import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.validateNodeAttributes;
import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.compareNodeAttributes;
import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.validateOrAddRemoteStoreRepository;
import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
Expand Down Expand Up @@ -460,20 +460,20 @@ public static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode
return;
}

/**
* TODO: The below check is valid till we support migration, once we start supporting migration a remote
* store node will be able to join a non remote store cluster and vice versa. #7986
*/
if (isRemoteStoreNode(joiningNode)) {
if (isRemoteStoreNode(existingNodes.get(0))) {
DiscoveryNode existingNode = existingNodes.get(0);
validateNodeAttributes(joiningNode, existingNode);
compareNodeAttributes(joiningNode, existingNode);
} else {
throw new IllegalStateException(
"node [" + joiningNode + "] is a restore store node and trying to join a non remote store cluster"
);
throw new IllegalStateException("a remote store node [" + joiningNode + "] is trying to join a non remote store cluster");
}
} else {
if (isRemoteStoreNode(existingNodes.get(0))) {
throw new IllegalStateException(
"node [" + joiningNode + "] is a non restore store node and trying to join a remote store cluster"
);
throw new IllegalStateException("a non remote store node [" + joiningNode + "] is trying to join a remote store cluster");
}
}
}
Expand Down
Loading

0 comments on commit 0442656

Please sign in to comment.