Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Fix NPE on restore searchable snapshot #13920

Merged
merged 2 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix get field mapping API returns 404 error in mixed cluster with multiple versions ([#13624](https://github.com/opensearch-project/OpenSearch/pull/13624))
- Allow clearing `remote_store.compatibility_mode` setting ([#13646](https://github.com/opensearch-project/OpenSearch/pull/13646))
- Pass parent filter to inner hit query ([#13903](https://github.com/opensearch-project/OpenSearch/pull/13903))
- Fix NPE on restore searchable snapshot ([#13911](https://github.com/opensearch-project/OpenSearch/pull/13911))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.node.Node;
import org.opensearch.repositories.fs.FsRepository;
import org.hamcrest.MatcherAssert;
import org.junit.After;

import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -62,6 +63,10 @@

import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS;
import static org.opensearch.core.common.util.CollectionUtils.iterableAsArrayList;
import static org.opensearch.index.store.remote.filecache.FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;
import static org.opensearch.test.NodeRoles.clusterManagerOnlyNode;
import static org.opensearch.test.NodeRoles.dataNode;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -939,6 +944,52 @@ public void testRelocateSearchableSnapshotIndex() throws Exception {
assertSearchableSnapshotIndexDirectoryExistence(searchNode2, index, false);
}

public void testCreateSearchableSnapshotWithSpecifiedRemoteDataRatio() throws Exception {
final String snapshotName = "test-snap";
final String repoName = "test-repo";
final String indexName1 = "test-idx-1";
final String restoredIndexName1 = indexName1 + "-copy";
final String indexName2 = "test-idx-2";
final String restoredIndexName2 = indexName2 + "-copy";
final int numReplicasIndex1 = 1;
final int numReplicasIndex2 = 1;

Settings clusterManagerNodeSettings = clusterManagerOnlyNode();
internalCluster().startNodes(2, clusterManagerNodeSettings);
Settings dateNodeSettings = dataNode();
internalCluster().startNodes(2, dateNodeSettings);
createIndexWithDocsAndEnsureGreen(numReplicasIndex1, 100, indexName1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex2, 100, indexName2);

final Client client = client();
assertAcked(
client.admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.getKey(), 5))
);

createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName1, indexName2);

internalCluster().ensureAtLeastNumSearchNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);

assertDocCount(restoredIndexName1, 100L);
assertDocCount(restoredIndexName2, 100L);
assertIndexDirectoryDoesNotExist(restoredIndexName1, restoredIndexName2);
}

@After
public void cleanup() throws Exception {
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().putNull(DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.getKey()))
);
}

private void assertSearchableSnapshotIndexDirectoryExistence(String nodeName, Index index, boolean exists) throws Exception {
final Node node = internalCluster().getInstance(Node.class, nodeName);
final ShardId shardId = new ShardId(index, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -59,8 +58,6 @@
import java.util.Set;
import java.util.stream.Stream;

import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;

/**
* Transport action for updating index settings
*
Expand Down Expand Up @@ -133,9 +130,7 @@ protected ClusterBlockException checkBlock(UpdateSettingsRequest request, Cluste
for (Index index : requestIndices) {
if (state.blocks().indexBlocked(ClusterBlockLevel.METADATA_WRITE, index.getName())) {
allowSearchableSnapshotSettingsUpdate = allowSearchableSnapshotSettingsUpdate
&& IndexModule.Type.REMOTE_SNAPSHOT.match(
state.getMetadata().getIndexSafe(index).getSettings().get(INDEX_STORE_TYPE_SETTING.getKey())
);
&& state.getMetadata().getIndexSafe(index).isRemoteSnapshot();
}
}
// check if all settings in the request are in the allow list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.IndexModule;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -399,7 +398,7 @@ public Builder addBlocks(IndexMetadata indexMetadata) {
if (IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.get(indexMetadata.getSettings())) {
addIndexBlock(indexName, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK);
}
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
if (indexMetadata.isRemoteSnapshot()) {
addIndexBlock(indexName, IndexMetadata.REMOTE_READ_ONLY_ALLOW_DELETE);
}
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.gateway.MetadataStateFormat;
import org.opensearch.index.IndexModule;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.indices.replication.common.ReplicationType;
Expand Down Expand Up @@ -685,6 +686,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
private final ActiveShardCount waitForActiveShards;
private final Map<String, RolloverInfo> rolloverInfos;
private final boolean isSystem;
private final boolean isRemoteSnapshot;

private IndexMetadata(
final Index index,
Expand Down Expand Up @@ -745,6 +747,7 @@ private IndexMetadata(
this.waitForActiveShards = waitForActiveShards;
this.rolloverInfos = Collections.unmodifiableMap(rolloverInfos);
this.isSystem = isSystem;
this.isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
}

Expand Down Expand Up @@ -1228,6 +1231,10 @@ public boolean isSystem() {
return isSystem;
}

public boolean isRemoteSnapshot() {
return isRemoteSnapshot;
}

public static Builder builder(String index) {
return new Builder(index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.Strings;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.node.ResponseCollectorService;

Expand Down Expand Up @@ -242,9 +241,7 @@ public GroupShardsIterator<ShardIterator> searchShards(
final Set<ShardIterator> set = new HashSet<>(shards.size());
for (IndexShardRoutingTable shard : shards) {
IndexMetadata indexMetadataForShard = indexMetadata(clusterState, shard.shardId.getIndex().getName());
if (IndexModule.Type.REMOTE_SNAPSHOT.match(
indexMetadataForShard.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey())
) && (preference == null || preference.isEmpty())) {
if (indexMetadataForShard.isRemoteSnapshot() && (preference == null || preference.isEmpty())) {
preference = Preference.PRIMARY.type();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexModule;

/**
* {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods
Expand Down Expand Up @@ -60,10 +58,6 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all
* @return {@link RoutingPool} for the given index.
*/
public static RoutingPool getIndexPool(IndexMetadata indexMetadata) {
Settings indexSettings = indexMetadata.getSettings();
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
return REMOTE_CAPABLE;
}
return LOCAL_ONLY;
return indexMetadata.isRemoteSnapshot() ? REMOTE_CAPABLE : LOCAL_ONLY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.snapshots.SnapshotShardSizeInfo;

Expand All @@ -68,7 +69,6 @@
import static org.opensearch.cluster.routing.RoutingPool.getShardPool;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;

/**
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
Expand Down Expand Up @@ -109,11 +109,13 @@ public class DiskThresholdDecider extends AllocationDecider {

private final DiskThresholdSettings diskThresholdSettings;
private final boolean enableForSingleDataNode;
private final FileCacheSettings fileCacheSettings;

public DiskThresholdDecider(Settings settings, ClusterSettings clusterSettings) {
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
assert Version.CURRENT.major < 9 : "remove enable_for_single_data_node in 9";
this.enableForSingleDataNode = ENABLE_FOR_SINGLE_DATA_NODE.get(settings);
this.fileCacheSettings = new FileCacheSettings(settings, clusterSettings);
}

/**
Expand Down Expand Up @@ -179,6 +181,12 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
The following block enables allocation for remote shards within safeguard limits of the filecache.
*/
if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio();
// we don't need to check the ratio
if (dataToFileCacheSizeRatio <= 0.1f) {
return Decision.YES;
}

final List<ShardRouting> remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false)
.filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getShardPool(shard, allocation)))
.collect(Collectors.toList());
Expand All @@ -199,7 +207,6 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
final long totalNodeRemoteShardSize = currentNodeRemoteShardSize + shardSize;
final double dataToFileCacheSizeRatio = DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.get(allocation.metadata().settings());
if (dataToFileCacheSizeRatio > 0.0f && totalNodeRemoteShardSize > dataToFileCacheSizeRatio * nodeCacheSize) {
return allocation.decision(
Decision.NO,
Expand All @@ -208,6 +215,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
);
}
return Decision.YES;
} else if (REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
return Decision.NO;
}

Map<String, DiskUsage> usages = clusterInfo.getNodeMostAvailableDiskUsages();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.index.remote.RemoteStorePressureSettings;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
Expand Down Expand Up @@ -690,7 +690,7 @@ public void apply(Settings value, Settings current, Settings previous) {

// Settings related to Searchable Snapshots
Node.NODE_SEARCH_CACHE_SIZE_SETTING,
FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING,
FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING,

// Settings related to Remote Refresh Segment Pressure
RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,6 @@ public static IndexMergePolicy fromString(String text) {
private volatile TimeValue remoteTranslogUploadBufferInterval;
private final String remoteStoreTranslogRepository;
private final String remoteStoreRepository;
private final boolean isRemoteSnapshot;
private int remoteTranslogKeepExtraGen;
private Version extendedCompatibilitySnapshotVersion;
// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
Expand Down Expand Up @@ -919,9 +918,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
remoteTranslogUploadBufferInterval = INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY);
this.remoteTranslogKeepExtraGen = INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.get(settings);
isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);

if (isRemoteSnapshot && FeatureFlags.isEnabled(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) {
if (isRemoteSnapshot() && FeatureFlags.isEnabled(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) {
extendedCompatibilitySnapshotVersion = SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION;
} else {
extendedCompatibilitySnapshotVersion = Version.CURRENT.minimumIndexCompatibilityVersion();
Expand Down Expand Up @@ -1273,7 +1271,7 @@ public boolean isRemoteTranslogStoreEnabled() {
* Returns true if this is remote/searchable snapshot
*/
public boolean isRemoteSnapshot() {
return isRemoteSnapshot;
return indexMetadata.isRemoteSnapshot();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.lucene.store.IndexInput;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.Setting;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
Expand Down Expand Up @@ -52,21 +51,6 @@ public class FileCache implements RefCountedCache<Path, CachedIndexInput> {

private final CircuitBreaker circuitBreaker;

/**
* Defines a limit of how much total remote data can be referenced as a ratio of the size of the disk reserved for
* the file cache. For example, if 100GB disk space is configured for use as a file cache and the
* remote_data_ratio of 5 is defined, then a total of 500GB of remote data can be loaded as searchable snapshots.
* This is designed to be a safeguard to prevent oversubscribing a cluster.
* Specify a value of zero for no limit, which is the default for compatibility reasons.
*/
public static final Setting<Double> DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = Setting.doubleSetting(
"cluster.filecache.remote_data_ratio",
0.0,
0.0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
this.theCache = cache;
this.circuitBreaker = circuitBreaker;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.remote.filecache;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

/**
* Settings relate to file cache
*
* @opensearch.internal
*/
public class FileCacheSettings {
/**
* Defines a limit of how much total remote data can be referenced as a ratio of the size of the disk reserved for
* the file cache. For example, if 100GB disk space is configured for use as a file cache and the
* remote_data_ratio of 5 is defined, then a total of 500GB of remote data can be loaded as searchable snapshots.
* This is designed to be a safeguard to prevent oversubscribing a cluster.
* Specify a value of zero for no limit, which is the default for compatibility reasons.
*/
public static final Setting<Double> DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = Setting.doubleSetting(
"cluster.filecache.remote_data_ratio",
0.0,
0.0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private volatile double remoteDataRatio;

public FileCacheSettings(Settings settings, ClusterSettings clusterSettings) {
setRemoteDataRatio(DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING, this::setRemoteDataRatio);
}

public void setRemoteDataRatio(double remoteDataRatio) {
this.remoteDataRatio = remoteDataRatio;
}

public double getRemoteDataRatio() {
return remoteDataRatio;
}
}
Loading
Loading