diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f64434c6ac27..023a8f8ee029c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -101,6 +101,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Refactor] OpenSearchException streamables to a registry ([#7646](https://github.com/opensearch-project/OpenSearch/pull/7646)) - [Refactor] Serverless common classes to libraries ([#8065](https://github.com/opensearch-project/OpenSearch/pull/8065)) - [Refactor] StreamIO and OpenSearchException foundation to core library ([#8035](https://github.com/opensearch-project/OpenSearch/pull/8035)) +- Add safeguard limits for file cache during node level allocation ([#8208](https://github.com/opensearch-project/OpenSearch/pull/8208)) ### Deprecated diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterInfoServiceIT.java index 17e8526acfd74..508b8e21e42c1 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterInfoServiceIT.java @@ -50,6 +50,7 @@ import org.opensearch.core.common.Strings; import org.opensearch.index.IndexService; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.remote.filecache.FileCacheStats; import org.opensearch.index.store.Store; import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndexDescriptor; @@ -192,6 +193,11 @@ public void testClusterInfoServiceCollectsInformation() { logger.info("--> shard size: {}", size); assertThat("shard size is greater than 0", size, greaterThanOrEqualTo(0L)); } + + final Map nodeFileCacheStats = info.nodeFileCacheStats; + assertNotNull(nodeFileCacheStats); + assertThat("file cache is empty on non search nodes", nodeFileCacheStats.size(), Matchers.equalTo(0)); + ClusterService clusterService = internalTestCluster.getInstance(ClusterService.class, internalTestCluster.getClusterManagerName()); ClusterState state = clusterService.state(); for (ShardRouting shard : state.routingTable().allShards()) { @@ -209,6 +215,28 @@ public void testClusterInfoServiceCollectsInformation() { } } + public void testClusterInfoServiceCollectsFileCacheInformation() { + internalCluster().startNodes(1); + internalCluster().ensureAtLeastNumSearchAndDataNodes(2); + + InternalTestCluster internalTestCluster = internalCluster(); + // Get the cluster info service on the cluster-manager node + final InternalClusterInfoService infoService = (InternalClusterInfoService) internalTestCluster.getInstance( + ClusterInfoService.class, + internalTestCluster.getClusterManagerName() + ); + infoService.setUpdateFrequency(TimeValue.timeValueMillis(200)); + ClusterInfo info = infoService.refresh(); + assertNotNull("info should not be null", info); + final Map nodeFileCacheStats = info.nodeFileCacheStats; + assertNotNull(nodeFileCacheStats); + assertThat("file cache is enabled on both search nodes", nodeFileCacheStats.size(), Matchers.equalTo(2)); + + for (FileCacheStats fileCacheStats : nodeFileCacheStats.values()) { + assertThat("file cache is non empty", fileCacheStats.getTotal().getBytes(), greaterThan(0L)); + } + } + public void testClusterInfoServiceInformationClearOnError() { internalCluster().startNodes( 2, diff --git a/server/src/main/java/org/opensearch/cluster/ClusterInfo.java b/server/src/main/java/org/opensearch/cluster/ClusterInfo.java index eb3f1527ba326..c553734964204 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterInfo.java @@ -34,6 +34,7 @@ import com.carrotsearch.hppc.ObjectHashSet; import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.opensearch.Version; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -43,6 +44,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.StoreStats; +import org.opensearch.index.store.remote.filecache.FileCacheStats; import java.io.IOException; import java.util.Collections; @@ -64,9 +66,10 @@ public class ClusterInfo implements ToXContentFragment, Writeable { public static final ClusterInfo EMPTY = new ClusterInfo(); final Map routingToDataPath; final Map reservedSpace; + final Map nodeFileCacheStats; protected ClusterInfo() { - this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); + this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); } /** @@ -84,13 +87,15 @@ public ClusterInfo( final Map mostAvailableSpaceUsage, final Map shardSizes, final Map routingToDataPath, - final Map reservedSpace + final Map reservedSpace, + final Map nodeFileCacheStats ) { this.leastAvailableSpaceUsage = leastAvailableSpaceUsage; this.shardSizes = shardSizes; this.mostAvailableSpaceUsage = mostAvailableSpaceUsage; this.routingToDataPath = routingToDataPath; this.reservedSpace = reservedSpace; + this.nodeFileCacheStats = nodeFileCacheStats; } public ClusterInfo(StreamInput in) throws IOException { @@ -110,6 +115,11 @@ public ClusterInfo(StreamInput in) throws IOException { this.shardSizes = Collections.unmodifiableMap(sizeMap); this.routingToDataPath = Collections.unmodifiableMap(routingMap); this.reservedSpace = Collections.unmodifiableMap(reservedSpaceMap); + if (in.getVersion().onOrAfter(Version.V_2_9_0)) { + this.nodeFileCacheStats = in.readMap(StreamInput::readString, FileCacheStats::new); + } else { + this.nodeFileCacheStats = Map.of(); + } } @Override @@ -121,6 +131,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) { out.writeMap(this.reservedSpace, (o, v) -> v.writeTo(o), (o, v) -> v.writeTo(o)); } + if (out.getVersion().onOrAfter(Version.V_2_9_0)) { + out.writeMap(this.nodeFileCacheStats, StreamOutput::writeString, (o, v) -> v.writeTo(o)); + } } public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { @@ -194,6 +207,13 @@ public Map getNodeMostAvailableDiskUsages() { return Collections.unmodifiableMap(this.mostAvailableSpaceUsage); } + /** + * Returns a node id to file cache stats mapping for the nodes that have search roles assigned to it. + */ + public Map getNodeFileCacheStats() { + return Collections.unmodifiableMap(this.nodeFileCacheStats); + } + /** * Returns the shard size for the given shard routing or null it that metric is not available. */ diff --git a/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java index 0acc7bece439f..9c12d6bb3e7ea 100644 --- a/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java @@ -59,6 +59,7 @@ import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.index.store.StoreStats; +import org.opensearch.index.store.remote.filecache.FileCacheStats; import org.opensearch.monitor.fs.FsInfo; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.ReceiveTimeoutTransportException; @@ -72,6 +73,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.stream.Collectors; /** * InternalClusterInfoService provides the ClusterInfoService interface, @@ -110,6 +112,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt private volatile Map leastAvailableSpaceUsages; private volatile Map mostAvailableSpaceUsages; + private volatile Map nodeFileCacheStats; private volatile IndicesStatsSummary indicesStatsSummary; // null if this node is not currently the cluster-manager private final AtomicReference refreshAndRescheduleRunnable = new AtomicReference<>(); @@ -122,6 +125,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) { this.leastAvailableSpaceUsages = Map.of(); this.mostAvailableSpaceUsages = Map.of(); + this.nodeFileCacheStats = Map.of(); this.indicesStatsSummary = IndicesStatsSummary.EMPTY; this.threadPool = threadPool; this.client = client; @@ -208,7 +212,8 @@ public ClusterInfo getClusterInfo() { mostAvailableSpaceUsages, indicesStatsSummary.shardSizes, indicesStatsSummary.shardRoutingToDataPath, - indicesStatsSummary.reservedSpace + indicesStatsSummary.reservedSpace, + nodeFileCacheStats ); } @@ -221,6 +226,7 @@ protected CountDownLatch updateNodeStats(final ActionListener(listener, latch)); return latch; @@ -264,6 +270,13 @@ public void onResponse(NodesStatsResponse nodesStatsResponse) { ); leastAvailableSpaceUsages = Collections.unmodifiableMap(leastAvailableUsagesBuilder); mostAvailableSpaceUsages = Collections.unmodifiableMap(mostAvailableUsagesBuilder); + + nodeFileCacheStats = Collections.unmodifiableMap( + nodesStatsResponse.getNodes() + .stream() + .filter(nodeStats -> nodeStats.getNode().isSearchNode()) + .collect(Collectors.toMap(nodeStats -> nodeStats.getNode().getId(), NodeStats::getFileCacheStats)) + ); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index ddd5e9274f08b..e216ca4511bff 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -54,14 +54,21 @@ import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.index.Index; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.remote.filecache.FileCacheStats; import org.opensearch.snapshots.SnapshotShardSizeInfo; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE; +import static org.opensearch.cluster.routing.RoutingPool.getNodePool; +import static org.opensearch.cluster.routing.RoutingPool.getShardPool; import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING; import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING; +import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO; /** * The {@link DiskThresholdDecider} checks that the node a shard is potentially @@ -167,6 +174,42 @@ public static long sizeOfRelocatingShards( @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { ClusterInfo clusterInfo = allocation.clusterInfo(); + + /* + The following block enables allocation for remote shards within safeguard limits of the filecache. + */ + if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) { + final List remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false) + .filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getShardPool(shard, allocation))) + .collect(Collectors.toList()); + final long currentNodeRemoteShardSize = remoteShardsOnNode.stream() + .map(ShardRouting::getExpectedShardSize) + .mapToLong(Long::longValue) + .sum(); + + final long shardSize = getExpectedShardSize( + shardRouting, + 0L, + allocation.clusterInfo(), + allocation.snapshotShardSizeInfo(), + allocation.metadata(), + allocation.routingTable() + ); + + final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null); + final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0; + final long totalNodeRemoteShardSize = currentNodeRemoteShardSize + shardSize; + + if (totalNodeRemoteShardSize > DATA_TO_FILE_CACHE_SIZE_RATIO * nodeCacheSize) { + return allocation.decision( + Decision.NO, + NAME, + "file cache limit reached - remote shard size will exceed configured safeguard ratio" + ); + } + return Decision.YES; + } + Map usages = clusterInfo.getNodeMostAvailableDiskUsages(); final Decision decision = earlyTerminate(allocation, usages); if (decision != null) { @@ -422,6 +465,15 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl if (shardRouting.currentNodeId().equals(node.nodeId()) == false) { throw new IllegalArgumentException("Shard [" + shardRouting + "] is not allocated on node: [" + node.nodeId() + "]"); } + + /* + The following block prevents movement for remote shards since they do not use the local storage as + the primary source of data storage. + */ + if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) { + return Decision.ALWAYS; + } + final ClusterInfo clusterInfo = allocation.clusterInfo(); final Map usages = clusterInfo.getNodeLeastAvailableDiskUsages(); final Decision decision = earlyTerminate(allocation, usages); diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java index 0aa3740fb6ecb..3d23b4d22538c 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java @@ -49,6 +49,9 @@ public class FileCache implements RefCountedCache { private final CircuitBreaker circuitBreaker; + // TODO: Convert the constant into an integer setting + public static final int DATA_TO_FILE_CACHE_SIZE_RATIO = 5; + public FileCache(SegmentedCache cache, CircuitBreaker circuitBreaker) { this.theCache = cache; this.circuitBreaker = circuitBreaker; diff --git a/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java index a32d6e35d0182..e1294da1e57bc 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java @@ -36,6 +36,7 @@ import org.opensearch.cluster.routing.TestShardRouting; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.remote.filecache.FileCacheStats; import org.opensearch.test.OpenSearchTestCase; import java.util.HashMap; @@ -49,7 +50,8 @@ public void testSerialization() throws Exception { randomDiskUsage(), randomShardSizes(), randomRoutingToDataPath(), - randomReservedSpace() + randomReservedSpace(), + randomFileCacheStats() ); BytesStreamOutput output = new BytesStreamOutput(); clusterInfo.writeTo(output); @@ -60,6 +62,7 @@ public void testSerialization() throws Exception { assertEquals(clusterInfo.shardSizes, result.shardSizes); assertEquals(clusterInfo.routingToDataPath, result.routingToDataPath); assertEquals(clusterInfo.reservedSpace, result.reservedSpace); + assertEquals(clusterInfo.getNodeFileCacheStats().size(), result.getNodeFileCacheStats().size()); } private static Map randomDiskUsage() { @@ -79,6 +82,25 @@ private static Map randomDiskUsage() { return builder; } + private static Map randomFileCacheStats() { + int numEntries = randomIntBetween(0, 16); + final Map builder = new HashMap<>(numEntries); + for (int i = 0; i < numEntries; i++) { + String key = randomAlphaOfLength(16); + FileCacheStats fileCacheStats = new FileCacheStats( + randomLong(), + randomLong(), + randomLong(), + randomLong(), + randomLong(), + randomLong(), + randomLong() + ); + builder.put(key, fileCacheStats); + } + return builder; + } + private static Map randomShardSizes() { int numEntries = randomIntBetween(0, 128); final Map builder = new HashMap<>(numEntries); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index 21d891bdbc317..3e21f6c19e150 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -798,7 +798,7 @@ private static ClusterInfo clusterInfo( final Map diskUsages, final Map reservedSpace ) { - return new ClusterInfo(diskUsages, null, null, null, reservedSpace); + return new ClusterInfo(diskUsages, null, null, null, reservedSpace, Map.of()); } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardConstraintDeciderOverlapTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardConstraintDeciderOverlapTests.java index 7112af6b4efc0..15dcae65ce6e7 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardConstraintDeciderOverlapTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardConstraintDeciderOverlapTests.java @@ -176,7 +176,7 @@ public DevNullClusterInfo( final Map shardSizes, final Map reservedSpace ) { - super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace); + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace, Map.of()); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java index 9d7d0ebc5b2b1..dbb08a999877d 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java @@ -239,7 +239,7 @@ public DevNullClusterInfo( final Map mostAvailableSpaceUsage, final Map shardSizes ) { - super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, Map.of()); + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, Map.of(), Map.of()); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index c23d98c95fc3c..4ccf0a9bc3a20 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -70,6 +70,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.index.Index; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.remote.filecache.FileCacheStats; import org.opensearch.repositories.IndexId; import org.opensearch.snapshots.EmptySnapshotsInfoService; import org.opensearch.snapshots.InternalSnapshotsInfoService.SnapshotShard; @@ -83,6 +84,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptyMap; @@ -283,6 +285,190 @@ public void testDiskThreshold() { assertThat(clusterState.getRoutingNodes().node("node4").size(), equalTo(1)); } + public void testDiskThresholdForRemoteShards() { + Settings diskSettings = Settings.builder() + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.8) + .build(); + + Map usages = new HashMap<>(); + usages.put("node1", new DiskUsage("node1", "node1", "/dev/null", 100, 10)); // 90% used + usages.put("node2", new DiskUsage("node2", "node2", "/dev/null", 100, 35)); // 65% used + usages.put("node3", new DiskUsage("node3", "node3", "/dev/null", 100, 60)); // 40% used + + Map shardSizes = new HashMap<>(); + shardSizes.put("[test][0][p]", 10L); // 10 bytes + shardSizes.put("[test][0][r]", 10L); + + Map fileCacheStatsMap = new HashMap<>(); + fileCacheStatsMap.put("node1", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0)); + fileCacheStatsMap.put("node2", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0)); + fileCacheStatsMap.put("node3", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0)); + final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes, fileCacheStatsMap); + + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + AllocationDeciders deciders = new AllocationDeciders( + new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY, clusterSettings), makeDecider(diskSettings))) + ); + + ClusterInfoService cis = () -> { + logger.info("--> calling fake getClusterInfo"); + return clusterInfo; + }; + AllocationService strategy = new AllocationService( + deciders, + new TestGatewayAllocator(), + new BalancedShardsAllocator(Settings.EMPTY), + cis, + EmptySnapshotsInfoService.INSTANCE + ); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(remoteIndexSettings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + final RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + Set defaultWithSearchRole = new HashSet<>(CLUSTER_MANAGER_DATA_ROLES); + defaultWithSearchRole.add(DiscoveryNodeRole.SEARCH_ROLE); + + logger.info("--> adding two nodes"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("node1", defaultWithSearchRole)).add(newNode("node2", defaultWithSearchRole))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + logShardStates(clusterState); + + // Primary shard should be initializing, replica should not + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logShardStates(clusterState); + // Assert that we're able to start the primary + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + + logger.info("--> adding node3"); + + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3"))).build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + logShardStates(clusterState); + // Assert that the replica is initialized now that node3 is available with enough space + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(0)); + + logger.info("--> start the shards (replicas)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logShardStates(clusterState); + // Assert that the replica couldn't be started since node1 doesn't have enough space + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(0)); + } + + public void testFileCacheRemoteShardsDecisions() { + Settings diskSettings = Settings.builder() + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "60%") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "70%") + .build(); + + // We have an index with 2 primary shards each taking 40 bytes. Each node has 100 bytes available + final Map usages = new HashMap<>(); + usages.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 20)); // 80% used + usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 100)); // 0% used + + final Map shardSizes = new HashMap<>(); + shardSizes.put("[test][0][p]", 40L); + shardSizes.put("[test][1][p]", 40L); + shardSizes.put("[foo][0][p]", 10L); + + // First node has filecache size as 0, second has 1000, greater than the shard sizes. + Map fileCacheStatsMap = new HashMap<>(); + fileCacheStatsMap.put("node1", new FileCacheStats(0, 0, 0, 0, 0, 0, 0)); + fileCacheStatsMap.put("node2", new FileCacheStats(0, 0, 1000, 0, 0, 0, 0)); + + final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes, fileCacheStatsMap); + + Set defaultWithSearchRole = new HashSet<>(CLUSTER_MANAGER_DATA_ROLES); + defaultWithSearchRole.add(DiscoveryNodeRole.SEARCH_ROLE); + + DiskThresholdDecider diskThresholdDecider = makeDecider(diskSettings); + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(remoteIndexSettings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + DiscoveryNode discoveryNode1 = new DiscoveryNode( + "node1", + buildNewFakeTransportAddress(), + emptyMap(), + defaultWithSearchRole, + Version.CURRENT + ); + DiscoveryNode discoveryNode2 = new DiscoveryNode( + "node2", + buildNewFakeTransportAddress(), + emptyMap(), + defaultWithSearchRole, + Version.CURRENT + ); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(discoveryNode1).add(discoveryNode2).build(); + + ClusterState baseClusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .nodes(discoveryNodes) + .build(); + + // Two shards consuming each 80% of disk space while 70% is allowed, so shard 0 isn't allowed here + ShardRouting firstRouting = TestShardRouting.newShardRouting("test", 0, "node1", null, true, ShardRoutingState.STARTED); + ShardRouting secondRouting = TestShardRouting.newShardRouting("test", 1, "node1", null, true, ShardRoutingState.STARTED); + RoutingNode firstRoutingNode = new RoutingNode("node1", discoveryNode1, firstRouting, secondRouting); + RoutingNode secondRoutingNode = new RoutingNode("node2", discoveryNode2); + + RoutingTable.Builder builder = RoutingTable.builder() + .add( + IndexRoutingTable.builder(firstRouting.index()) + .addIndexShard(new IndexShardRoutingTable.Builder(firstRouting.shardId()).addShard(firstRouting).build()) + .addIndexShard(new IndexShardRoutingTable.Builder(secondRouting.shardId()).addShard(secondRouting).build()) + ); + ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build(); + RoutingAllocation routingAllocation = new RoutingAllocation( + null, + new RoutingNodes(clusterState), + clusterState, + clusterInfo, + null, + System.nanoTime() + ); + routingAllocation.debugDecision(true); + Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + + decision = diskThresholdDecider.canAllocate(firstRouting, firstRoutingNode, routingAllocation); + assertThat(decision.type(), equalTo(Decision.Type.NO)); + + assertThat( + ((Decision.Single) decision).getExplanation(), + containsString("file cache limit reached - remote shard size will exceed configured safeguard ratio") + ); + + decision = diskThresholdDecider.canAllocate(firstRouting, secondRoutingNode, routingAllocation); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + } + public void testDiskThresholdWithAbsoluteSizes() { Settings diskSettings = Settings.builder() .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) @@ -863,7 +1049,8 @@ public void testShardRelocationsTakenIntoAccount() { Map.of( new ClusterInfo.NodeAndPath("node1", "/dev/null"), new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), between(51, 200)).build() - ) + ), + Map.of() ) ); clusterState = applyStartedShardsUntilNoChange(clusterState, strategy); @@ -1455,16 +1642,26 @@ static class DevNullClusterInfo extends ClusterInfo { final Map mostAvailableSpaceUsage, final Map shardSizes ) { - this(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of()); + this(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of(), Map.of()); + } + + DevNullClusterInfo( + final Map leastAvailableSpaceUsage, + final Map mostAvailableSpaceUsage, + final Map shardSizes, + final Map nodeFileCacheStats + ) { + this(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of(), nodeFileCacheStats); } DevNullClusterInfo( final Map leastAvailableSpaceUsage, final Map mostAvailableSpaceUsage, final Map shardSizes, - Map reservedSpace + Map reservedSpace, + final Map nodeFileCacheStats ) { - super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace); + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace, nodeFileCacheStats); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index caab381e65e84..62c52e93aad33 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -127,7 +127,7 @@ public void testCanAllocateUsesMaxAvailableSpace() { final Map shardSizes = new HashMap<>(); shardSizes.put("[test][0][p]", 10L); // 10 bytes - final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages, mostAvailableUsage, shardSizes, Map.of(), Map.of()); + final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages, mostAvailableUsage, shardSizes, Map.of(), Map.of(), Map.of()); RoutingAllocation allocation = new RoutingAllocation( new AllocationDeciders(Collections.singleton(decider)), clusterState.getRoutingNodes(), @@ -203,7 +203,7 @@ public void testCannotAllocateDueToLackOfDiskResources() { // way bigger than available space final long shardSize = randomIntBetween(110, 1000); shardSizes.put("[test][0][p]", shardSize); - ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages, mostAvailableUsage, shardSizes, Map.of(), Map.of()); + ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages, mostAvailableUsage, shardSizes, Map.of(), Map.of(), Map.of()); RoutingAllocation allocation = new RoutingAllocation( new AllocationDeciders(Collections.singleton(decider)), clusterState.getRoutingNodes(), @@ -320,7 +320,14 @@ public void testCanRemainUsesLeastAvailableSpace() { shardSizes.put("[test][1][p]", 10L); shardSizes.put("[test][2][p]", 10L); - final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages, mostAvailableUsage, shardSizes, shardRoutingMap, Map.of()); + final ClusterInfo clusterInfo = new ClusterInfo( + leastAvailableUsages, + mostAvailableUsage, + shardSizes, + shardRoutingMap, + Map.of(), + Map.of() + ); RoutingAllocation allocation = new RoutingAllocation( new AllocationDeciders(Collections.singleton(decider)), clusterState.getRoutingNodes(), diff --git a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java index 6634d1b4dbafc..6354cf18e8b62 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java @@ -132,7 +132,8 @@ class SizeFakingClusterInfo extends ClusterInfo { delegate.getNodeMostAvailableDiskUsages(), delegate.shardSizes, delegate.routingToDataPath, - delegate.reservedSpace + delegate.reservedSpace, + delegate.nodeFileCacheStats ); } diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java index 7722b59313b5f..ec397a2baa640 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java @@ -109,6 +109,7 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.env.TestEnvironment; import org.opensearch.index.Index; +import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.index.analysis.AnalyzerScope; @@ -1200,6 +1201,13 @@ public static Settings.Builder settings(Version version) { return builder; } + public static Settings.Builder remoteIndexSettings(Version version) { + Settings.Builder builder = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, version) + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()); + return builder; + } + /** * Returns size random values */