Skip to content

Commit

Permalink
[Remote Store] Introducing mixed mode support for remote store migrat…
Browse files Browse the repository at this point in the history
…ion (opensearch-project#11986)

* Introducing mixed mode support for remote store migration

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>

* Adding changelog

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>

* Refactoring IT and naming changes

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>

* Missing java doc

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>

---------

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
gbbafna authored and Peter Alfonsi committed Mar 1, 2024
1 parent d260214 commit 5d43ba0
Show file tree
Hide file tree
Showing 10 changed files with 341 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625))
- [Admission Control] Integrate CPU AC with ResourceUsageCollector and add CPU AC stats to nodes/stats ([#10887](https://github.com/opensearch-project/OpenSearch/pull/10887))
- [S3 Repository] Add setting to control connection count for sync client ([#12028](https://github.com/opensearch-project/OpenSearch/pull/12028))
- Add Remote Store Migration Experimental flag and allow mixed mode clusters under same ([#11986](https://github.com/opensearch-project/OpenSearch/pull/11986))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotemigration;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.List;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
public class DocRepMigrationTestCase extends MigrationBaseTestCase {

public void testMixedModeAddDocRep() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
List<String> cmNodes = internalCluster().startNodes(1);

Client client = internalCluster().client(cmNodes.get(0));
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
addRemote = false;
internalCluster().startNode();
String[] allNodes = internalCluster().getNodeNames();
assertBusy(() -> { assertEquals(client.admin().cluster().prepareClusterStats().get().getNodes().size(), allNodes.length); });
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotemigration;

import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;

public class MigrationBaseTestCase extends OpenSearchIntegTestCase {
protected static final String REPOSITORY_NAME = "test-remote-store-repo";
protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2";

protected Path segmentRepoPath;
protected Path translogRepoPath;

boolean addRemote = false;

protected Settings nodeSettings(int nodeOrdinal) {
if (segmentRepoPath == null || translogRepoPath == null) {
segmentRepoPath = randomRepoPath().toAbsolutePath();
translogRepoPath = randomRepoPath().toAbsolutePath();
}
if (addRemote) {
logger.info("Adding remote store node");
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
.put("discovery.initial_state_timeout", "500ms")
.build();
} else {
logger.info("Adding docrep node");
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put("discovery.initial_state_timeout", "500ms").build();
}
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotemigration;

import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.List;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
public class RemoteStoreMigrationTestCase extends MigrationBaseTestCase {
public void testMixedModeAddRemoteNodes() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
List<String> cmNodes = internalCluster().startNodes(1);
Client client = internalCluster().client(cmNodes.get(0));
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// add remote node in mixed mode cluster
addRemote = true;
internalCluster().startNode();
internalCluster().startNode();
internalCluster().validateClusterFormed();

// assert repo gets registered
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { REPOSITORY_NAME });
GetRepositoriesResponse getRepositoriesResponse = client.admin().cluster().getRepositories(gr).actionGet();
assertEquals(1, getRepositoriesResponse.repositories().size());

// add docrep mode in mixed mode cluster
addRemote = true;
internalCluster().startNode();
assertBusy(() -> {
assertEquals(client.admin().cluster().prepareClusterStats().get().getNodes().size(), internalCluster().getNodeNames().length);
});

// add incompatible remote node in remote mixed cluster
Settings.Builder badSettings = Settings.builder()
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, "REPOSITORY_2_NAME", translogRepoPath))
.put("discovery.initial_state_timeout", "500ms");
String badNode = internalCluster().startNode(badSettings);
assertTrue(client.admin().cluster().prepareClusterStats().get().getNodes().size() < internalCluster().getNodeNames().length);
internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(badNode));
}

public void testMigrationDirections() {
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
// add remote node in docrep cluster
updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "docrep"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "random"));
assertThrows(IllegalArgumentException.class, () -> client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -176,12 +177,13 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo

DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes());

// TODO: We are using one of the existing node to build the repository metadata, this will need to be updated
// once we start supporting mixed compatibility mode. An optimization can be done as this will get invoked
// An optimization can be done as this will get invoked
// for every set of node join task which we can optimize to not compute if cluster state already has
// repository information.
Optional<DiscoveryNode> remoteDN = currentNodes.getNodes().values().stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
DiscoveryNode dn = remoteDN.orElseGet(() -> (currentNodes.getNodes().values()).stream().findFirst().get());
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
(currentNodes.getNodes().values()).stream().findFirst().get(),
dn,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
);

Expand Down Expand Up @@ -212,6 +214,16 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
// would guarantee that a decommissioned node would never be able to join the cluster and ensures correctness
ensureNodeCommissioned(node, currentState.metadata());
nodesBuilder.add(node);

if (remoteDN.isEmpty()) {
// This is hit only on cases where we encounter first remote node
logger.info("Updating system repository now for remote store");
repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
node,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
);
}

nodesChanged = true;
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
Expand Down Expand Up @@ -495,36 +507,46 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod

assert existingNodes.isEmpty() == false;

// TODO: The below check is valid till we don't support migration, once we start supporting migration a remote
// store node will be able to join a non remote store cluster and vice versa. #7986
CompatibilityMode remoteStoreCompatibilityMode = REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(metadata.settings());
if (STRICT.equals(remoteStoreCompatibilityMode)) {

DiscoveryNode existingNode = existingNodes.get(0);
if (joiningNode.isRemoteStoreNode()) {
ensureRemoteStoreNodesCompatibility(joiningNode, existingNode);
} else {
if (existingNode.isRemoteStoreNode()) {
RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode);
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode);
if (existingRemoteStoreNodeAttribute.equals(joiningRemoteStoreNodeAttribute) == false) {
throw new IllegalStateException(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in "
+ "comparison with existing node ["
+ existingNode
+ "]"
);
}
} else {
throw new IllegalStateException(
"a remote store node [" + joiningNode + "] is trying to join a non remote store cluster"
"a non remote store node [" + joiningNode + "] is trying to join a remote store cluster"
);
}
} else {
if (existingNode.isRemoteStoreNode()) {
}
} else {
if (remoteStoreCompatibilityMode == CompatibilityMode.MIXED) {
if (joiningNode.isRemoteStoreNode()) {
Optional<DiscoveryNode> remoteDN = existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode));
}
}
}
}

private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode) {
if (joiningNode.isRemoteStoreNode()) {
if (existingNode.isRemoteStoreNode()) {
RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode);
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode);
if (existingRemoteStoreNodeAttribute.equals(joiningRemoteStoreNodeAttribute) == false) {
throw new IllegalStateException(
"a non remote store node [" + joiningNode + "] is trying to join a remote store cluster"
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in "
+ "comparison with existing node ["
+ existingNode
+ "]"
);
}
} else {
throw new IllegalStateException("a remote store node [" + joiningNode + "] is trying to join a non remote store cluster");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ protected FeatureFlagSettings(
FeatureFlags.TELEMETRY_SETTING,
FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING,
FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING,
FeatureFlags.DOC_ID_FUZZY_SET_SETTING
FeatureFlags.DOC_ID_FUZZY_SET_SETTING,
FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING
);
}
11 changes: 11 additions & 0 deletions server/src/main/java/org/opensearch/common/util/FeatureFlags.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
* @opensearch.internal
*/
public class FeatureFlags {
/**
* Gates the visibility of the remote store migration support from docrep .
*/
public static final String REMOTE_STORE_MIGRATION_EXPERIMENTAL = "opensearch.experimental.feature.remote_store.migration.enabled";

/**
* Gates the ability for Searchable Snapshots to read snapshots that are older than the
* guaranteed backward compatibility for OpenSearch (one prior major version) on a best effort basis.
Expand Down Expand Up @@ -98,6 +103,12 @@ public static boolean isEnabled(Setting<Boolean> featureFlag) {
}
}

public static final Setting<Boolean> REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING = Setting.boolSetting(
REMOTE_STORE_MIGRATION_EXPERIMENTAL,
false,
Property.NodeScope
);

public static final Setting<Boolean> EXTENSIONS_SETTING = Setting.boolSetting(EXTENSIONS, false, Property.NodeScope);

public static final Setting<Boolean> IDENTITY_SETTING = Setting.boolSetting(IDENTITY, false, Property.NodeScope);
Expand Down
Loading

0 comments on commit 5d43ba0

Please sign in to comment.