-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
247 additions
and
4 deletions.
There are no files selected for viewing
146 changes: 146 additions & 0 deletions
146
.../action/admin/cluster/remotestore/repository/RemoteStoreRepositoryRegistrationHelper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.action.admin.cluster.remotestore.repository; | ||
|
||
import org.opensearch.cluster.ClusterState; | ||
import org.opensearch.cluster.metadata.Metadata; | ||
import org.opensearch.cluster.metadata.RepositoriesMetadata; | ||
import org.opensearch.cluster.metadata.RepositoryMetadata; | ||
import org.opensearch.cluster.node.DiscoveryNode; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.repositories.RepositoriesService; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
|
||
/** | ||
* RemoteStore Repository Registration helper | ||
*/ | ||
public class RemoteStoreRepositoryRegistrationHelper { | ||
|
||
public static boolean isRemoteStoreEnabled(ClusterState currentState) { | ||
return currentState.getMetadata().settings().get("cluster.remote_store.segment.repository") != null | ||
&& currentState.getMetadata().settings().get("cluster.remote_store.translog.repository") != null; | ||
} | ||
|
||
private static Settings buildSettings(String stringSettings) { | ||
Settings.Builder settings = Settings.builder(); | ||
|
||
String[] stringKeyValue = stringSettings.split(","); | ||
for (int i = 0; i < stringKeyValue.length; i++) { | ||
String[] keyValue = stringKeyValue[i].split(":"); | ||
settings.put(keyValue[0].trim(), keyValue[1].trim()); | ||
} | ||
|
||
return settings.build(); | ||
} | ||
|
||
public static RepositoryMetadata buildSegmentRepositoryMetadata(DiscoveryNode node) { | ||
|
||
String name = node.getAttributes().get("remote_store.segment.repository.name"); | ||
String type = node.getAttributes().get("remote_store.segment.repository.type"); | ||
String stringSettings = node.getAttributes().get("remote_store.segment.repository.settings"); | ||
|
||
assert name != null : "Name cannot be null"; | ||
assert type != null : "Type cannot be null"; | ||
assert stringSettings != null : "Settings cannot be null"; | ||
|
||
return new RepositoryMetadata(name, type, buildSettings(stringSettings)); | ||
|
||
} | ||
|
||
public static RepositoryMetadata buildTranslogRepositoryMetadata(DiscoveryNode node) { | ||
String name = node.getAttributes().get("remote_store.translog.repository.name"); | ||
String type = node.getAttributes().get("remote_store.translog.repository.type"); | ||
String stringSettings = node.getAttributes().get("remote_store.translog.repository.settings"); | ||
|
||
assert name != null : "Name cannot be null"; | ||
assert type != null : "Type cannot be null"; | ||
assert stringSettings != null : "Settings cannot be null"; | ||
|
||
return new RepositoryMetadata(name, type, buildSettings(stringSettings)); | ||
} | ||
|
||
synchronized public static ClusterState validateOrAddRemoteStoreRepository(ClusterState currentState, DiscoveryNode node) { | ||
|
||
assert currentState.getMetadata() | ||
.settings() | ||
.get("cluster.remote_store.segment.repository") | ||
.equals(node.getAttributes().get("remote_store.segment.repository.name")); | ||
assert currentState.getMetadata() | ||
.settings() | ||
.get("cluster.remote_store.translog.repository") | ||
.equals(node.getAttributes().get("remote_store.translog.repository.name")); | ||
|
||
boolean segmentStoreRepositoryFound = false; | ||
boolean translogStoreRepositoryFound = false; | ||
RepositoryMetadata newSegmentRepositoryMetadata = buildSegmentRepositoryMetadata(node); | ||
RepositoryMetadata newTranslogRepositoryMetadata = buildTranslogRepositoryMetadata(node); | ||
|
||
RepositoriesMetadata repositories = currentState.metadata().custom(RepositoriesMetadata.TYPE); | ||
if (repositories != null) { | ||
for (RepositoryMetadata existingRepositoryMetadata : repositories.repositories()) { | ||
if (existingRepositoryMetadata.name().equals(node.getAttributes().get("remote_store.segment.repository.name"))) { | ||
assert existingRepositoryMetadata.equalsIgnoreGenerations(newSegmentRepositoryMetadata); | ||
segmentStoreRepositoryFound = true; | ||
} | ||
if (existingRepositoryMetadata.name().equals(node.getAttributes().get("remote_store.translog.repository.name"))) { | ||
assert existingRepositoryMetadata.equalsIgnoreGenerations(newTranslogRepositoryMetadata); | ||
translogStoreRepositoryFound = true; | ||
} | ||
} | ||
} | ||
|
||
ClusterState.Builder newState = ClusterState.builder(currentState); | ||
|
||
if (!segmentStoreRepositoryFound) { | ||
newState = AddRepositoryInformation(currentState, newSegmentRepositoryMetadata); | ||
currentState = newState.build(); | ||
} | ||
if (!translogStoreRepositoryFound) { | ||
newState = AddRepositoryInformation(currentState, newTranslogRepositoryMetadata); | ||
} | ||
|
||
return newState.build(); | ||
} | ||
|
||
private static ClusterState.Builder AddRepositoryInformation(ClusterState currentState, RepositoryMetadata newRepositoryMetadata) { | ||
RepositoriesService.validate(newRepositoryMetadata.name()); | ||
|
||
Metadata metadata = currentState.metadata(); | ||
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); | ||
RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE); | ||
if (repositories == null) { | ||
repositories = new RepositoriesMetadata(Collections.singletonList(newRepositoryMetadata)); | ||
} else { | ||
boolean found = false; | ||
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1); | ||
|
||
for (RepositoryMetadata repositoryMetadata : repositories.repositories()) { | ||
if (repositoryMetadata.name().equals(newRepositoryMetadata.name())) { | ||
if (newRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)) { | ||
// Previous version is the same as this one no update is needed. | ||
return new ClusterState.Builder(currentState); | ||
} | ||
found = true; | ||
repositoriesMetadata.add(newRepositoryMetadata); | ||
} else { | ||
repositoriesMetadata.add(repositoryMetadata); | ||
} | ||
} | ||
if (!found) { | ||
repositoriesMetadata.add(newRepositoryMetadata); | ||
} | ||
repositories = new RepositoriesMetadata(repositoriesMetadata); | ||
} | ||
mdBuilder.putCustom(RepositoriesMetadata.TYPE, repositories); | ||
return ClusterState.builder(currentState).metadata(mdBuilder); | ||
} | ||
} |
10 changes: 10 additions & 0 deletions
10
...rc/main/java/org/opensearch/action/admin/cluster/remotestore/repository/package-info.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
/** Restore remote store transport handler. */ | ||
package org.opensearch.action.admin.cluster.remotestore.repository; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
55 changes: 55 additions & 0 deletions
55
server/src/main/java/org/opensearch/cluster/coordination/RemoteStoreJoinTaskExecutor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package org.opensearch.cluster.coordination; | ||
|
||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.cluster.ClusterState; | ||
import org.opensearch.cluster.node.DiscoveryNode; | ||
import org.opensearch.cluster.routing.RerouteService; | ||
import org.opensearch.cluster.routing.allocation.AllocationService; | ||
import org.opensearch.common.settings.Settings; | ||
|
||
import java.util.List; | ||
|
||
import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.validateOrAddRemoteStoreRepository; | ||
import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.isRemoteStoreEnabled; | ||
|
||
/** | ||
* Main executor for Remote Store Nodes joining the OpenSearch cluster | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class RemoteStoreJoinTaskExecutor extends JoinTaskExecutor { | ||
public RemoteStoreJoinTaskExecutor( | ||
Settings settings, | ||
AllocationService allocationService, | ||
Logger logger, | ||
RerouteService rerouteService | ||
) { | ||
super(settings, allocationService, logger, rerouteService); | ||
} | ||
|
||
// TODO: Add verifyRepositoryListener to validate if all the nodes are aware of the repository. | ||
@Override | ||
public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> joiningNodes) throws Exception { | ||
ClusterState intermediateState, newState = currentState; | ||
|
||
if (isRemoteStoreEnabled(currentState)) { | ||
for (final Task joinTask : joiningNodes) { | ||
if (joinTask.isBecomeClusterManagerTask() || joinTask.isFinishElectionTask()) { | ||
// noop | ||
} else if (currentState.nodes().nodeExistsWithSameRoles(joinTask.node())) { | ||
// noop | ||
} else { | ||
final DiscoveryNode node = joinTask.node(); | ||
intermediateState = validateOrAddRemoteStoreRepository(newState, node); | ||
newState = intermediateState; | ||
} | ||
} | ||
} | ||
return super.execute(newState, joiningNodes); | ||
} | ||
|
||
/** | ||
* Questions - | ||
* | ||
*/ | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters