Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding index create block when all nodes have breached high disk watermark #5852

Merged
merged 2 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Adding index create block when all nodes have breached high disk watermark ([#5852](https://github.com/opensearch-project/OpenSearch/pull/5852))
- Added cluster manager throttling stats in nodes/stats API ([#5790](https://github.com/opensearch-project/OpenSearch/pull/5790))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@

import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.InternalClusterInfoService;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
Expand Down Expand Up @@ -82,12 +85,15 @@
import java.nio.file.NotDirectoryException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -126,7 +132,9 @@ public void removeFilesystemProvider() {
defaultFileSystem = null;
}

private static final long WATERMARK_BYTES = new ByteSizeValue(10, ByteSizeUnit.KB).getBytes();
// Increasing watermark limit to avoid flaky test case failures.
private static final long WATERMARK_BYTES = new ByteSizeValue(1, ByteSizeUnit.MB).getBytes();
private static final String INDEX_ROUTING_ALLOCATION_NODE_SETTING = "index.routing.allocation.include._name";

@Override
protected Settings nodeSettings(int nodeOrdinal) {
Expand Down Expand Up @@ -167,16 +175,7 @@ public void testHighWatermarkNotExceeded() throws Exception {
final Path dataNode0Path = internalCluster().getInstance(Environment.class, dataNodeName).dataFiles()[0];

final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6)
.put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms")
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false)
.build()
);
final long minShardSize = createReasonableSizedShards(indexName);
final long minShardSize = createAndPopulateIndex(indexName, null);

// reduce disk size of node 0 so that no shards fit below the high watermark, forcing all shards onto the other data node
// (subtract the translog size since the disk threshold decider ignores this and may therefore move the shard back again)
Expand All @@ -188,6 +187,124 @@ public void testHighWatermarkNotExceeded() throws Exception {
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, hasSize(1));
}

public void testIndexCreateBlockWhenAllNodesExceededHighWatermark() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2);
ensureStableCluster(3);

final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster()
.getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());

// Reduce disk space of all node until all of them is breaching high disk watermark.
RS146BIJAY marked this conversation as resolved.
Show resolved Hide resolved
for (final String dataNodeName : dataNodeNames) {
populateNode(dataNodeName);
}

// Wait for all nodes to breach high disk watermark.
assertBusy(() -> {
refreshDiskUsage();
assertTrue(
StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false)
.allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES)
);
}, 30L, TimeUnit.SECONDS);

// Validate if cluster block is applied on the cluster
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}

public void testIndexCreateBlockNotAppliedWhenAnyNodesBelowHighWatermark() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2);
ensureStableCluster(3);

final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster()
.getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());

// Validate cluster block is not applied on the cluster
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertFalse(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}

public void testIndexCreateBlockIsRemovedWhenAnyNodesNotExceedHighWatermark() throws Exception {
RS146BIJAY marked this conversation as resolved.
Show resolved Hide resolved
internalCluster().startClusterManagerOnlyNode();
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2);
final List<String> indexNames = new ArrayList<>();
ensureStableCluster(3);

final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster()
.getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());

// Reduce disk space of all node until all of them is breaching high disk watermark.
for (final String dataNodeName : dataNodeNames) {
final String indexName = populateNode(dataNodeName);
indexNames.add(indexName);
}

// Wait for all the node to breach high disk watermark.
assertBusy(() -> {
refreshDiskUsage();
assertTrue(
StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false)
.allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES)
);
}, 30L, TimeUnit.SECONDS);

// Validate if index create block is applied on the cluster
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));

// Delete indices to free space
deleteIndices(indexNames);
RS146BIJAY marked this conversation as resolved.
Show resolved Hide resolved

// Validate if index create block is removed on the cluster
assertBusy(() -> {
refreshDiskUsage();
ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertFalse(state1.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}, 30L, TimeUnit.SECONDS);
}

public void testIndexCreateBlockWithAReadOnlyBlock() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2);
ensureStableCluster(3);
final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster()
.getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());

// Create one of the index.
final String indexName = populateNode(dataNodeNames.get(0));

// Reduce disk space of all other node until all of them is breaching high disk watermark.
for (int i = 1; i < dataNodeNames.size(); i++) {
populateNode(dataNodeNames.get(i));
}

// Apply a read_only_allow_delete_block on one of the index
// (can happen if the corresponding node has breached flood stage watermark).
final Settings readOnlySettings = Settings.builder()
.put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, Boolean.TRUE.toString())
.build();
client().admin().indices().prepareUpdateSettings(indexName).setSettings(readOnlySettings).get();

assertBusy(() -> {
refreshDiskUsage();
assertTrue(
StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false)
.allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES)
);
}, 30L, TimeUnit.SECONDS);

// Validate index create block is applied on the cluster.
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}

public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
Expand All @@ -210,16 +327,7 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti
final Path dataNode0Path = internalCluster().getInstance(Environment.class, dataNodeName).dataFiles()[0];

final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6)
.put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms")
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false)
.build()
);
final long minShardSize = createReasonableSizedShards(indexName);
final long minShardSize = createAndPopulateIndex(indexName, null);

final CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
Expand Down Expand Up @@ -274,6 +382,40 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, hasSize(1));
}

private void deleteIndices(final List<String> indexNames) throws ExecutionException, InterruptedException {
for (String indexName : indexNames) {
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName)).get());
assertFalse("index [" + indexName + "] should have been deleted", indexExists(indexName));
}
}

private String populateNode(final String dataNodeName) throws Exception {
final Path dataNodePath = internalCluster().getInstance(Environment.class, dataNodeName).dataFiles()[0];
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
long minShardSize = createAndPopulateIndex(indexName, dataNodeName);
fileSystemProvider.getTestFileStore(dataNodePath).setTotalSpace(minShardSize + WATERMARK_BYTES - 1L);
refreshDiskUsage();
return indexName;
}

private long createAndPopulateIndex(final String indexName, final String nodeName) throws Exception {

final Settings.Builder indexSettingBuilder = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms")
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false);

// Depending on node name specified or not, we determine whether to enable node name based shard routing for index.
if (nodeName != null) {
indexSettingBuilder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(INDEX_ROUTING_ALLOCATION_NODE_SETTING, nodeName);
} else {
indexSettingBuilder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6);
}

createIndex(indexName, indexSettingBuilder.build());
return createReasonableSizedShards(indexName);
}

private Set<ShardRouting> getShardRoutings(final String nodeId, final String indexName) {
final Set<ShardRouting> shardRoutings = new HashSet<>();
for (IndexShardRoutingTable indexShardRoutingTable : client().admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.cluster.DiskUsage;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
Expand Down Expand Up @@ -78,7 +79,6 @@
public class DiskThresholdMonitor {

private static final Logger logger = LogManager.getLogger(DiskThresholdMonitor.class);

private final DiskThresholdSettings diskThresholdSettings;
private final Client client;
private final Supplier<ClusterState> clusterStateSupplier;
Expand Down Expand Up @@ -286,7 +286,7 @@ public void onNewInfo(ClusterInfo info) {
}
}

final ActionListener<Void> listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 3);
final ActionListener<Void> listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 4);

if (reroute) {
logger.debug("rerouting shards: [{}]", explanation);
Expand Down Expand Up @@ -373,6 +373,15 @@ public void onNewInfo(ClusterInfo info) {
} else {
listener.onResponse(null);
}

// If all the nodes are breaching high disk watermark, we apply index create block to avoid red clusters.
if (nodesOverHighThreshold.size() == nodes.size()) {
setIndexCreateBlock(listener, true);
} else if (state.getBlocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())) {
setIndexCreateBlock(listener, false);
} else {
listener.onResponse(null);
}
}

// exposed for tests to override
Expand Down Expand Up @@ -406,6 +415,27 @@ private void setLastRunTimeMillis() {
lastRunTimeMillis.getAndUpdate(l -> Math.max(l, currentTimeMillisSupplier.getAsLong()));
}

protected void setIndexCreateBlock(final ActionListener<Void> listener, boolean indexCreateBlock) {
final ActionListener<Void> wrappedListener = ActionListener.wrap(r -> {
setLastRunTimeMillis();
RS146BIJAY marked this conversation as resolved.
Show resolved Hide resolved
listener.onResponse(r);
}, e -> {
logger.debug("setting index create block failed", e);
setLastRunTimeMillis();
listener.onFailure(e);
});

final Settings indexCreateBlockSetting = indexCreateBlock
? Settings.builder().put(Metadata.SETTING_CREATE_INDEX_BLOCK_SETTING.getKey(), Boolean.TRUE.toString()).build()
: Settings.builder().putNull(Metadata.SETTING_CREATE_INDEX_BLOCK_SETTING.getKey()).build();

client.admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(indexCreateBlockSetting)
.execute(ActionListener.map(wrappedListener, r -> null));
}

protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener<Void> listener, boolean readOnly) {
// set read-only block but don't block on the response
ActionListener<Void> wrappedListener = ActionListener.wrap(r -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ protected void updateIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionLi
assertTrue(readOnly);
listener.onResponse(null);
}

@Override
protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexCreateBlock) {
listener.onResponse(null);
}
};

ImmutableOpenMap.Builder<String, DiskUsage> builder = ImmutableOpenMap.builder();
Expand Down Expand Up @@ -185,6 +190,11 @@ protected void updateIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionLi
assertTrue(readOnly);
listener.onResponse(null);
}

@Override
protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexCreateBlock) {
listener.onResponse(null);
}
};

indices.set(null);
Expand Down Expand Up @@ -372,6 +382,12 @@ protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener
}
listener.onResponse(null);
}

@Override
protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexCreateBlock) {
listener.onResponse(null);
}

};
indicesToMarkReadOnly.set(null);
indicesToRelease.set(null);
Expand Down Expand Up @@ -428,6 +444,11 @@ protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener
}
listener.onResponse(null);
}

@Override
protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexCreateBlock) {
listener.onResponse(null);
}
};
// When free disk on any of node1 or node2 goes below 5% flood watermark, then apply index block on indices not having the block
indicesToMarkReadOnly.set(null);
Expand Down Expand Up @@ -536,6 +557,11 @@ protected void updateIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionLi
long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, ClusterInfo info, ClusterState reroutedClusterState) {
return relocatingShardSizeRef.get();
}

@Override
protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexCreateBlock) {
listener.onResponse(null);
}
};

final ImmutableOpenMap.Builder<String, DiskUsage> allDisksOkBuilder;
Expand Down