Skip to content

Commit

Permalink
Fixing flaky test cases and refactoring code
Browse files Browse the repository at this point in the history
Signed-off-by: Rishav Sagar <rissag@amazon.com>
  • Loading branch information
Rishav Sagar committed Jan 19, 2023
1 parent fd5c656 commit f401496
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.InternalClusterInfoService;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.DiskThresholdMonitor;
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings;
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.Rebalance;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -134,6 +134,7 @@ public void removeFilesystemProvider() {

// Increasing watermark limit to avoid flaky test case failures.
private static final long WATERMARK_BYTES = new ByteSizeValue(1, ByteSizeUnit.MB).getBytes();
private static final String INDEX_ROUTING_ALLOCATION_NODE_SETTING = "index.routing.allocation.include._name";

@Override
protected Settings nodeSettings(int nodeOrdinal) {
Expand Down Expand Up @@ -174,7 +175,7 @@ public void testHighWatermarkNotExceeded() throws Exception {
final Path dataNode0Path = internalCluster().getInstance(Environment.class, dataNodeName).dataFiles()[0];

final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final long minShardSize = createAndPopulateIndex(indexName);
final long minShardSize = createAndPopulateIndex(indexName, null);

// reduce disk size of node 0 so that no shards fit below the high watermark, forcing all shards onto the other data node
// (subtract the translog size since the disk threshold decider ignores this and may therefore move the shard back again)
Expand All @@ -200,12 +201,32 @@ public void testIndexCreateBlockWhenAllNodesExceededHighWatermark() throws Excep
populateNode(dataNodeName);
}

// Validate if cluster block is applied on the node
// Wait for all nodes to breach high disk watermark.
assertBusy(() -> {
refreshDiskUsage();
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(DiskThresholdMonitor.INDEX_CREATE_BLOCK_ID));
}, 2L, TimeUnit.MINUTES);
assertTrue(
StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false)
.allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES)
);
}, 30L, TimeUnit.SECONDS);

// Validate if cluster block is applied on the cluster
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}

public void testIndexCreateBlockNotAppliedWhenAnyNodesBelowHighWatermark() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> dataNodeNames = internalCluster().startDataOnlyNodes(2);
ensureStableCluster(3);

final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster()
.getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());

// Validate cluster block is not applied on the cluster
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertFalse(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}

public void testIndexCreateBlockIsRemovedWhenAnyNodesNotExceedHighWatermark() throws Exception {
Expand All @@ -224,20 +245,28 @@ public void testIndexCreateBlockIsRemovedWhenAnyNodesNotExceedHighWatermark() th
indexNames.add(indexName);
}

// Validate if index create block is applied on the cluster
// Wait for all the node to breach high disk watermark.
assertBusy(() -> {
refreshDiskUsage();
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(DiskThresholdMonitor.INDEX_CREATE_BLOCK_ID));
}, 2L, TimeUnit.MINUTES);
assertTrue(
StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false)
.allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES)
);
}, 30L, TimeUnit.SECONDS);

// Validate if index create block is applied on the cluster
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));

// Delete indices to free space
deleteIndices(indexNames);

// Validate if index create block is removed on the cluster
assertBusy(() -> {
refreshDiskUsage();
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertFalse(state.blocks().hasGlobalBlockWithId(DiskThresholdMonitor.INDEX_CREATE_BLOCK_ID));
}, 2L, TimeUnit.MINUTES);
ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertFalse(state1.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}, 30L, TimeUnit.SECONDS);
}

public void testIndexCreateBlockWithAReadOnlyBlock() throws Exception {
Expand All @@ -248,25 +277,32 @@ public void testIndexCreateBlockWithAReadOnlyBlock() throws Exception {
.getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
internalCluster().getCurrentClusterManagerNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());

// Create one of the index.
final String indexName = populateNode(dataNodeNames.get(0));

// Reduce disk space of all other node until all of them is breaching high disk watermark.
for (int i = 1; i < dataNodeNames.size(); i++) {
populateNode(dataNodeNames.get(i));
}

// Apply a read_only_allow_delete_block on one of the index
// (can happen if the corresponding node has breached flood stage watermark).
final Settings readOnlySettings = Settings.builder()
.put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, Boolean.TRUE.toString())
.build();
client().admin().indices().prepareUpdateSettings(indexName).setSettings(readOnlySettings).get();

// Reduce disk space of all other node until all of them is breaching high disk watermark.
for (int i = 1; i < dataNodeNames.size(); i++) {
populateNode(dataNodeNames.get(i));
}

// Validate if cluster block is applied on the node
assertBusy(() -> {
refreshDiskUsage();
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(DiskThresholdMonitor.INDEX_CREATE_BLOCK_ID));
}, 2L, TimeUnit.MINUTES);
assertTrue(
StreamSupport.stream(clusterInfoService.getClusterInfo().getNodeLeastAvailableDiskUsages().values().spliterator(), false)
.allMatch(cur -> cur.value.getFreeBytes() < WATERMARK_BYTES)
);
}, 30L, TimeUnit.SECONDS);

// Validate index create block is applied on the cluster.
ClusterState state = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}

public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Exception {
Expand All @@ -291,7 +327,7 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti
final Path dataNode0Path = internalCluster().getInstance(Environment.class, dataNodeName).dataFiles()[0];

final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final long minShardSize = createAndPopulateIndex(indexName);
final long minShardSize = createAndPopulateIndex(indexName, null);

final CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
Expand Down Expand Up @@ -356,22 +392,27 @@ private void deleteIndices(final List<String> indexNames) throws ExecutionExcept
private String populateNode(final String dataNodeName) throws Exception {
final Path dataNodePath = internalCluster().getInstance(Environment.class, dataNodeName).dataFiles()[0];
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
long minShardSize = createAndPopulateIndex(indexName);
long minShardSize = createAndPopulateIndex(indexName, dataNodeName);
fileSystemProvider.getTestFileStore(dataNodePath).setTotalSpace(minShardSize + WATERMARK_BYTES - 1L);
refreshDiskUsage();
return indexName;
}

private long createAndPopulateIndex(final String indexName) throws Exception {
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6)
.put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms")
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false)
.build()
);
private long createAndPopulateIndex(final String indexName, final String nodeName) throws Exception {

final Settings.Builder indexSettingBuilder = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms")
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false);

// Depending on node name specified or not, we determine whether to enable node name based shard routing for index.
if (nodeName != null) {
indexSettingBuilder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(INDEX_ROUTING_ALLOCATION_NODE_SETTING, nodeName);
} else {
indexSettingBuilder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6);
}

createIndex(indexName, indexSettingBuilder.build());
return createReasonableSizedShards(indexName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
public class DiskThresholdMonitor {

private static final Logger logger = LogManager.getLogger(DiskThresholdMonitor.class);
public static final int INDEX_CREATE_BLOCK_ID = 10;
private final DiskThresholdSettings diskThresholdSettings;
private final Client client;
private final Supplier<ClusterState> clusterStateSupplier;
Expand Down Expand Up @@ -377,9 +376,9 @@ public void onNewInfo(ClusterInfo info) {

// If all the nodes are breaching high disk watermark, we apply index create block to avoid red clusters.
if (nodesOverHighThreshold.size() == nodes.size()) {
applyIndexCreateBlock(listener, true);
} else if (state.getBlocks().hasGlobalBlockWithId(INDEX_CREATE_BLOCK_ID)) {
applyIndexCreateBlock(listener, false);
setIndexCreateBlock(listener, true);
} else if (state.getBlocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())) {
setIndexCreateBlock(listener, false);
} else {
listener.onResponse(null);
}
Expand Down Expand Up @@ -416,13 +415,9 @@ private void setLastRunTimeMillis() {
lastRunTimeMillis.getAndUpdate(l -> Math.max(l, currentTimeMillisSupplier.getAsLong()));
}

protected void applyIndexCreateBlock(final ActionListener<Void> listener, boolean indexCreateBlock) {
final ActionListener<Void> wrappedListener = ActionListener.wrap(r -> {
setLastRunTimeMillis();
listener.onResponse(r);
}, e -> {
protected void setIndexCreateBlock(final ActionListener<Void> listener, boolean indexCreateBlock) {
final ActionListener<Void> wrappedListener = ActionListener.wrap(r -> { listener.onResponse(r); }, e -> {
logger.debug("setting index create block failed", e);
setLastRunTimeMillis();
listener.onFailure(e);
});

Expand All @@ -439,12 +434,8 @@ protected void applyIndexCreateBlock(final ActionListener<Void> listener, boolea

protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener<Void> listener, boolean readOnly) {
// set read-only block but don't block on the response
ActionListener<Void> wrappedListener = ActionListener.wrap(r -> {
setLastRunTimeMillis();
listener.onResponse(r);
}, e -> {
ActionListener<Void> wrappedListener = ActionListener.wrap(r -> { listener.onResponse(r); }, e -> {
logger.debug(new ParameterizedMessage("setting indices [{}] read-only failed", readOnly), e);
setLastRunTimeMillis();
listener.onFailure(e);
});
Settings readOnlySettings = readOnly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ protected void updateIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionLi
}

@Override
protected void applyIndexCreateBlock(ActionListener<Void> listener, boolean indexCreateBlock) {
protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexCreateBlock) {
listener.onResponse(null);
}
};
Expand Down Expand Up @@ -192,7 +192,7 @@ protected void updateIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionLi
}

@Override
protected void applyIndexCreateBlock(ActionListener<Void> listener, boolean indexCreateBlock) {
protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexCreateBlock) {
listener.onResponse(null);
}
};
Expand Down Expand Up @@ -384,7 +384,7 @@ protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener
}

@Override
protected void applyIndexCreateBlock(ActionListener<Void> listener, boolean indexCreateBlock) {
protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexCreateBlock) {
listener.onResponse(null);
}

Expand Down Expand Up @@ -446,7 +446,7 @@ protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener
}

@Override
protected void applyIndexCreateBlock(ActionListener<Void> listener, boolean indexCreateBlock) {
protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexCreateBlock) {
listener.onResponse(null);
}
};
Expand Down Expand Up @@ -559,7 +559,7 @@ long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, Cluste
}

@Override
protected void applyIndexCreateBlock(ActionListener<Void> listener, boolean indexCreateBlock) {
protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexCreateBlock) {
listener.onResponse(null);
}
};
Expand Down

0 comments on commit f401496

Please sign in to comment.