Skip to content

Commit

Permalink
Fixing unblock condition for index create block
Browse files Browse the repository at this point in the history
Signed-off-by: Rishav Sagar <rissag@amazon.com>
  • Loading branch information
Rishav Sagar committed Aug 18, 2023
1 parent 46f2bd0 commit 95990be
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Fixed
- Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827))
- Fix compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944))
- Fix condition to remove index create block ([#9436](https://github.com/opensearch-project/OpenSearch/pull/9436))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,13 +380,18 @@ public void onNewInfo(ClusterInfo info) {
if ((state.getBlocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()) == false)
&& nodes.size() > 0
&& nodesOverHighThreshold.size() == nodes.size()) {
setIndexCreateBlock(listener, true);
logger.warn("Enabling index create block on cluster as all nodes are breaching high disk watermark."
+ "Number of nodes above high watermark: {}. Total numbers of node: {}",
nodesOverHighThreshold.size(), nodes.size());
setIndexCreateBlock(listener, true);
} else if (state.getBlocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())
&& diskThresholdSettings.isCreateIndexBlockAutoReleaseEnabled()) {
&& diskThresholdSettings.isCreateIndexBlockAutoReleaseEnabled()
&& nodesOverHighThreshold.size() != nodes.size()) {
logger.warn("Removing index create block on cluster as all nodes are no longer breaching high disk watermark.");
setIndexCreateBlock(listener, false);
} else {
listener.onResponse(null);
}
} else {
listener.onResponse(null);
}
}

// exposed for tests to override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,7 @@
import org.opensearch.test.MockLogAppender;
import org.opensearch.test.junit.annotations.TestLogging;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -69,6 +64,7 @@

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE;

public class DiskThresholdMonitorTests extends OpenSearchAllocationTestCase {

Expand Down Expand Up @@ -581,11 +577,15 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC
);

advanceTime.set(false); // will do one reroute and emit warnings, but subsequent reroutes and associated messages are delayed
assertSingleWarningMessage(
final List<String> messages = new ArrayList<>();
messages.add("high disk watermark [90%] exceeded on * shards will be relocated away from this node* "
+ "the node is expected to continue to exceed the high disk watermark when these relocations are complete");
messages.add("Enabling index create block on cluster as all nodes are breaching high disk watermark."
+ "Number of nodes above high watermark: 1. Total numbers of node: 1");
assertMultipleWarningMessages(
monitor,
aboveHighWatermark,
"high disk watermark [90%] exceeded on * shards will be relocated away from this node* "
+ "the node is expected to continue to exceed the high disk watermark when these relocations are complete"
messages
);

advanceTime.set(true);
Expand All @@ -605,21 +605,14 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC

relocatingShardSizeRef.set(-5L);
advanceTime.set(true);
assertSingleInfoMessage(
monitor,
aboveHighWatermark,
"high disk watermark [90%] exceeded on * shards will be relocated away from this node* "
+ "the node is expected to be below the high disk watermark when these relocations are complete"
);

relocatingShardSizeRef.set(0L);
timeSupplier.getAsLong(); // advance time long enough to do another reroute
advanceTime.set(false); // will do one reroute and emit warnings, but subsequent reroutes and associated messages are delayed
assertSingleWarningMessage(
assertMultipleWarningMessages(
monitor,
aboveHighWatermark,
"high disk watermark [90%] exceeded on * shards will be relocated away from this node* "
+ "the node is expected to continue to exceed the high disk watermark when these relocations are complete"
messages
);

advanceTime.set(true);
Expand Down Expand Up @@ -722,6 +715,109 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC
assertTrue(countBlocksCalled.get() == 0);
}

public void testIndexCreateBlockRemovedOnlyWhenAnyNodeAboveHighWatermark() {
AllocationService allocation = createAllocationService(
Settings.builder().put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.blocks.create_index.enabled", false).build()
);
Metadata metadata = Metadata.builder()
.put(
IndexMetadata.builder("test")
.settings(settings(Version.CURRENT).put("index.routing.allocation.require._id", "node2"))
.numberOfShards(1)
.numberOfReplicas(0)
)
.put(
IndexMetadata.builder("test_1")
.settings(settings(Version.CURRENT).put("index.routing.allocation.require._id", "node1"))
.numberOfShards(1)
.numberOfReplicas(0)
)
.put(
IndexMetadata.builder("test_2")
.settings(settings(Version.CURRENT).put("index.routing.allocation.require._id", "node1"))
.numberOfShards(1)
.numberOfReplicas(0)
)
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metadata.index("test"))
.addAsNew(metadata.index("test_1"))
.addAsNew(metadata.index("test_2"))
.build();

final ClusterState clusterState = applyStartedShardsUntilNoChange(
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.blocks(ClusterBlocks.builder().addGlobalBlock(Metadata.CLUSTER_CREATE_INDEX_BLOCK).build())
.nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")))
.build(),
allocation
);
AtomicReference<Set<String>> indices = new AtomicReference<>();
AtomicInteger countBlocksCalled = new AtomicInteger();
AtomicInteger countUnblockBlocksCalled = new AtomicInteger();
AtomicLong currentTime = new AtomicLong();
Settings settings = Settings.builder().put(CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE.getKey(), true).build();
DiskThresholdMonitor monitor = new DiskThresholdMonitor(
settings,
() -> clusterState,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null,
currentTime::get,
(reason, priority, listener) -> {
listener.onResponse(null);
}
) {

@Override
protected void updateIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener, boolean readOnly) {
assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly));
assertTrue(readOnly);
listener.onResponse(null);
}

@Override
protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexCreateBlock) {
if (indexCreateBlock == true) {
countBlocksCalled.set(countBlocksCalled.get() + 1);
} else {
countUnblockBlocksCalled.set(countUnblockBlocksCalled.get() + 1);
}

listener.onResponse(null);
}
};

Map<String, DiskUsage> builder = new HashMap<>();

//Make all the nodes below high disk watermark.
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, 9));
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, 9));
monitor.onNewInfo(clusterInfo(builder));
// Block is already present and nodes are below high watermark so neither block nor block will be called
assertEquals(countBlocksCalled.get(), 0);
assertEquals(countUnblockBlocksCalled.get(), 0);

monitor.onNewInfo(clusterInfo(builder));
// Block is already present and nodes are below high watermark so neither block nor block will be called even in second iteration.
assertEquals(countBlocksCalled.get(), 0);
assertEquals(countUnblockBlocksCalled.get(), 0);

builder = new HashMap<>();

//Make all the nodes below high disk watermark.
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, 19));
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, 1));
currentTime.addAndGet(randomLongBetween(60001, 120000));

monitor.onNewInfo(clusterInfo(builder));
// Block will be removed if any nodes went above high watermark.
assertEquals(countBlocksCalled.get(), 0);
assertEquals(countUnblockBlocksCalled.get(), 1);
}

private void assertNoLogging(DiskThresholdMonitor monitor, final Map<String, DiskUsage> diskUsages) throws IllegalAccessException {
try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(DiskThresholdMonitor.class))) {
mockAppender.addExpectation(
Expand Down Expand Up @@ -756,10 +852,11 @@ private void assertRepeatedWarningMessages(DiskThresholdMonitor monitor, final M
}
}

private void assertSingleWarningMessage(DiskThresholdMonitor monitor, final Map<String, DiskUsage> diskUsages, String message)
private void assertMultipleWarningMessages(DiskThresholdMonitor monitor, final Map<String, DiskUsage> diskUsages, List<String> messages)
throws IllegalAccessException {
assertLogging(monitor, diskUsages, Level.WARN, message);
assertNoLogging(monitor, diskUsages);
for (int index = 0; index < messages.size(); index++) {
assertLogging(monitor, diskUsages, Level.WARN, messages.get(index));
}
}

private void assertSingleInfoMessage(DiskThresholdMonitor monitor, final Map<String, DiskUsage> diskUsages, String message)
Expand Down

0 comments on commit 95990be

Please sign in to comment.