Skip to content

Commit

Permalink
Add safeguard limits for file cache during node level allocation
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal committed Jun 21, 2023
1 parent 9c5c6eb commit 98e9df7
Show file tree
Hide file tree
Showing 17 changed files with 390 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support OpenSSL Provider with default Netty allocator ([#5460](https://github.com/opensearch-project/OpenSearch/pull/5460))
- Replaces ZipInputStream with ZipFile to fix Zip Slip vulnerability ([#7230](https://github.com/opensearch-project/OpenSearch/pull/7230))
- Add missing validation/parsing of SearchBackpressureMode of SearchBackpressureSettings ([#7541](https://github.com/opensearch-project/OpenSearch/pull/7541))
- Add safeguard limits for file cache during node level allocation ([#8208](https://github.com/opensearch-project/OpenSearch/pull/8208))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.index.store.Store;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndexDescriptor;
Expand Down Expand Up @@ -192,6 +193,11 @@ public void testClusterInfoServiceCollectsInformation() {
logger.info("--> shard size: {}", size);
assertThat("shard size is greater than 0", size, greaterThanOrEqualTo(0L));
}

final Map<String, FileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
assertNotNull(nodeFileCacheStats);
assertThat("file cache is empty on non search nodes", nodeFileCacheStats.size(), Matchers.equalTo(0));

ClusterService clusterService = internalTestCluster.getInstance(ClusterService.class, internalTestCluster.getClusterManagerName());
ClusterState state = clusterService.state();
for (ShardRouting shard : state.routingTable().allShards()) {
Expand All @@ -209,6 +215,28 @@ public void testClusterInfoServiceCollectsInformation() {
}
}

public void testClusterInfoServiceCollectsFileCacheInformation() {
internalCluster().startNodes(1);
internalCluster().ensureAtLeastNumSearchAndDataNodes(2);

InternalTestCluster internalTestCluster = internalCluster();
// Get the cluster info service on the cluster-manager node
final InternalClusterInfoService infoService = (InternalClusterInfoService) internalTestCluster.getInstance(
ClusterInfoService.class,
internalTestCluster.getClusterManagerName()
);
infoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
ClusterInfo info = infoService.refresh();
assertNotNull("info should not be null", info);
final Map<String, FileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
assertNotNull(nodeFileCacheStats);
assertThat("file cache is enabled on both search nodes", nodeFileCacheStats.size(), Matchers.equalTo(2));

for (FileCacheStats fileCacheStats : nodeFileCacheStats.values()) {
assertThat("file cache is non empty", fileCacheStats.getTotal().getBytes(), greaterThan(0L));
}
}

public void testClusterInfoServiceInformationClearOnError() {
internalCluster().startNodes(
2,
Expand Down
24 changes: 22 additions & 2 deletions server/src/main/java/org/opensearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.opensearch.Version;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand All @@ -42,6 +43,7 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.remote.filecache.FileCacheStats;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -63,9 +65,10 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
public static final ClusterInfo EMPTY = new ClusterInfo();
final Map<ShardRouting, String> routingToDataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, FileCacheStats> nodeFileCacheStats;

protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
}

/**
Expand All @@ -83,13 +86,15 @@ public ClusterInfo(
final Map<String, DiskUsage> mostAvailableSpaceUsage,
final Map<String, Long> shardSizes,
final Map<ShardRouting, String> routingToDataPath,
final Map<NodeAndPath, ReservedSpace> reservedSpace
final Map<NodeAndPath, ReservedSpace> reservedSpace,
final Map<String, FileCacheStats> nodeFileCacheStats
) {
this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
this.shardSizes = shardSizes;
this.mostAvailableSpaceUsage = mostAvailableSpaceUsage;
this.routingToDataPath = routingToDataPath;
this.reservedSpace = reservedSpace;
this.nodeFileCacheStats = nodeFileCacheStats;
}

public ClusterInfo(StreamInput in) throws IOException {
Expand All @@ -105,6 +110,11 @@ public ClusterInfo(StreamInput in) throws IOException {
this.shardSizes = Collections.unmodifiableMap(sizeMap);
this.routingToDataPath = Collections.unmodifiableMap(routingMap);
this.reservedSpace = Collections.unmodifiableMap(reservedSpaceMap);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.nodeFileCacheStats = in.readMap(StreamInput::readString, FileCacheStats::new);
} else {
this.nodeFileCacheStats = Map.of();

Check warning on line 116 in server/src/main/java/org/opensearch/cluster/ClusterInfo.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/ClusterInfo.java#L116

Added line #L116 was not covered by tests
}
}

@Override
Expand All @@ -114,6 +124,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(this.shardSizes, StreamOutput::writeString, (o, v) -> out.writeLong(v == null ? -1 : v));
out.writeMap(this.routingToDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString);
out.writeMap(this.reservedSpace, (o, v) -> v.writeTo(o), (o, v) -> v.writeTo(o));
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeMap(this.nodeFileCacheStats, StreamOutput::writeString, (o, v) -> v.writeTo(o));
}
}

public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Expand Down Expand Up @@ -187,6 +200,13 @@ public Map<String, DiskUsage> getNodeMostAvailableDiskUsages() {
return Collections.unmodifiableMap(this.mostAvailableSpaceUsage);
}

/**
* Returns a node id to file cache stats mapping for the nodes that have search roles assigned to it.
*/
public Map<String, FileCacheStats> getNodeFileCacheStats() {
return Collections.unmodifiableMap(this.nodeFileCacheStats);
}

/**
* Returns the shard size for the given shard routing or <code>null</code> it that metric is not available.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.store.StoreStats;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ReceiveTimeoutTransportException;
Expand All @@ -72,6 +73,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
* InternalClusterInfoService provides the ClusterInfoService interface,
Expand Down Expand Up @@ -110,6 +112,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt

private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
private volatile Map<String, FileCacheStats> nodeFileCacheStats;
private volatile IndicesStatsSummary indicesStatsSummary;
// null if this node is not currently the cluster-manager
private final AtomicReference<RefreshAndRescheduleRunnable> refreshAndRescheduleRunnable = new AtomicReference<>();
Expand All @@ -122,6 +125,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) {
this.leastAvailableSpaceUsages = Map.of();
this.mostAvailableSpaceUsages = Map.of();
this.nodeFileCacheStats = Map.of();
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
this.threadPool = threadPool;
this.client = client;
Expand Down Expand Up @@ -208,7 +212,8 @@ public ClusterInfo getClusterInfo() {
mostAvailableSpaceUsages,
indicesStatsSummary.shardSizes,
indicesStatsSummary.shardRoutingToDataPath,
indicesStatsSummary.reservedSpace
indicesStatsSummary.reservedSpace,
nodeFileCacheStats
);
}

Expand All @@ -221,6 +226,7 @@ protected CountDownLatch updateNodeStats(final ActionListener<NodesStatsResponse
final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true");
nodesStatsRequest.clear();
nodesStatsRequest.addMetric(NodesStatsRequest.Metric.FS.metricName());
nodesStatsRequest.addMetric(NodesStatsRequest.Metric.FILE_CACHE_STATS.metricName());
nodesStatsRequest.timeout(fetchTimeout);
client.admin().cluster().nodesStats(nodesStatsRequest, new LatchedActionListener<>(listener, latch));
return latch;
Expand Down Expand Up @@ -264,6 +270,13 @@ public void onResponse(NodesStatsResponse nodesStatsResponse) {
);
leastAvailableSpaceUsages = Collections.unmodifiableMap(leastAvailableUsagesBuilder);
mostAvailableSpaceUsages = Collections.unmodifiableMap(mostAvailableUsagesBuilder);

nodeFileCacheStats = Collections.unmodifiableMap(
nodesStatsResponse.getNodes()
.stream()
.filter(nodeStats -> nodeStats.getNode().isSearchNode())
.collect(Collectors.toMap(nodeStats -> nodeStats.getNode().getId(), NodeStats::getFileCacheStats))
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -329,6 +330,21 @@ public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
return shards;
}

/**
* Determine the shards that satisfy the predicate
* @param predicate shard routing predicate which needs to be met
* @return List of shards
*/
public List<ShardRouting> shardsSatisfyingPredicate(Predicate<ShardRouting> predicate) {
List<ShardRouting> shards = new ArrayList<>();
for (ShardRouting shardEntry : this) {
if (predicate.test(shardEntry)) {
shards.add(shardEntry);
}
}
return shards;
}

/**
* Determine the shards of an index with a specific state
* @param index id of the index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,8 @@ public static RoutingPool getIndexPool(IndexMetadata indexMetadata) {
}
return LOCAL_ONLY;
}

public static boolean isShardAndNodePoolRemote(ShardRouting shardRouting, RoutingAllocation allocation, RoutingNode node) {
return REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingPool;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
Expand All @@ -54,14 +55,17 @@
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.index.Index;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.snapshots.SnapshotShardSizeInfo;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;

import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO;

/**
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
Expand Down Expand Up @@ -167,6 +171,42 @@ public static long sizeOfRelocatingShards(
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
ClusterInfo clusterInfo = allocation.clusterInfo();

/*
The following block enables allocation for remote shards within safeguard limits of the filecache.
*/
if (RoutingPool.isShardAndNodePoolRemote(shardRouting, allocation, node)) {
final Predicate<ShardRouting> shardRoutingPredicate = shard -> shard.primary()
&& RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation));
final List<ShardRouting> remoteShardsOnNode = node.shardsSatisfyingPredicate(shardRoutingPredicate);
final long currentNodeRemoteShardSize = remoteShardsOnNode.stream()
.map(ShardRouting::getExpectedShardSize)
.mapToLong(Long::longValue)
.sum();

final long shardSize = getExpectedShardSize(
shardRouting,
0L,
allocation.clusterInfo(),
allocation.snapshotShardSizeInfo(),
allocation.metadata(),
allocation.routingTable()
);

final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
final long totalNodeRemoteShardSize = currentNodeRemoteShardSize + shardSize;

if (totalNodeRemoteShardSize > DATA_TO_FILE_CACHE_SIZE_RATIO * nodeCacheSize) {
return allocation.decision(
Decision.NO,
NAME,
"file cache limit reached - remote shard size will exceed configured safeguard ratio"
);
}
return Decision.YES;
}

Map<String, DiskUsage> usages = clusterInfo.getNodeMostAvailableDiskUsages();
final Decision decision = earlyTerminate(allocation, usages);
if (decision != null) {
Expand Down Expand Up @@ -422,6 +462,15 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
if (shardRouting.currentNodeId().equals(node.nodeId()) == false) {
throw new IllegalArgumentException("Shard [" + shardRouting + "] is not allocated on node: [" + node.nodeId() + "]");
}

/*
The following block prevents movement for remote shards since they do not use the local storage as
the primary source of data storage.
*/
if (RoutingPool.isShardAndNodePoolRemote(shardRouting, allocation, node)) {
return Decision.ALWAYS;
}

final ClusterInfo clusterInfo = allocation.clusterInfo();
final Map<String, DiskUsage> usages = clusterInfo.getNodeLeastAvailableDiskUsages();
final Decision decision = earlyTerminate(allocation, usages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class FileCache implements RefCountedCache<Path, CachedIndexInput> {

private final CircuitBreaker circuitBreaker;

// TODO: Convert the constant into an integer setting
public static final int DATA_TO_FILE_CACHE_SIZE_RATIO = 5;

public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
this.theCache = cache;
this.circuitBreaker = circuitBreaker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.cluster.routing.TestShardRouting;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.test.OpenSearchTestCase;

import java.util.HashMap;
Expand All @@ -49,7 +50,8 @@ public void testSerialization() throws Exception {
randomDiskUsage(),
randomShardSizes(),
randomRoutingToDataPath(),
randomReservedSpace()
randomReservedSpace(),
randomFileCacheStats()
);
BytesStreamOutput output = new BytesStreamOutput();
clusterInfo.writeTo(output);
Expand All @@ -60,6 +62,7 @@ public void testSerialization() throws Exception {
assertEquals(clusterInfo.shardSizes, result.shardSizes);
assertEquals(clusterInfo.routingToDataPath, result.routingToDataPath);
assertEquals(clusterInfo.reservedSpace, result.reservedSpace);
assertEquals(clusterInfo.getNodeFileCacheStats().size(), result.getNodeFileCacheStats().size());
}

private static Map<String, DiskUsage> randomDiskUsage() {
Expand All @@ -79,6 +82,25 @@ private static Map<String, DiskUsage> randomDiskUsage() {
return builder;
}

private static Map<String, FileCacheStats> randomFileCacheStats() {
int numEntries = randomIntBetween(0, 16);
final Map<String, FileCacheStats> builder = new HashMap<>(numEntries);
for (int i = 0; i < numEntries; i++) {
String key = randomAlphaOfLength(16);
FileCacheStats fileCacheStats = new FileCacheStats(
randomLong(),
randomLong(),
randomLong(),
randomLong(),
randomLong(),
randomLong(),
randomLong()
);
builder.put(key, fileCacheStats);
}
return builder;
}

private static Map<String, Long> randomShardSizes() {
int numEntries = randomIntBetween(0, 128);
final Map<String, Long> builder = new HashMap<>(numEntries);
Expand Down
Loading

0 comments on commit 98e9df7

Please sign in to comment.