Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add safeguard limits for file cache during node level allocation #8208

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Search Pipelines] Pass pipeline creation context to processor factories ([#8164](https://github.com/opensearch-project/OpenSearch/pull/8164))
- Enabling compression levels for zstd and zstd_no_dict ([#8312](https://github.com/opensearch-project/OpenSearch/pull/8312))
- Optimize Metadata build() to skip redundant computations as part of ClusterState build ([#7853](https://github.com/opensearch-project/OpenSearch/pull/7853))
- Add safeguard limits for file cache during node level allocation ([#8208](https://github.com/opensearch-project/OpenSearch/pull/8208))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.index.store.Store;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndexDescriptor;
Expand Down Expand Up @@ -192,6 +193,11 @@ public void testClusterInfoServiceCollectsInformation() {
logger.info("--> shard size: {}", size);
assertThat("shard size is greater than 0", size, greaterThanOrEqualTo(0L));
}

final Map<String, FileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
assertNotNull(nodeFileCacheStats);
assertThat("file cache is empty on non search nodes", nodeFileCacheStats.size(), Matchers.equalTo(0));

ClusterService clusterService = internalTestCluster.getInstance(ClusterService.class, internalTestCluster.getClusterManagerName());
ClusterState state = clusterService.state();
for (ShardRouting shard : state.routingTable().allShards()) {
Expand All @@ -209,6 +215,28 @@ public void testClusterInfoServiceCollectsInformation() {
}
}

public void testClusterInfoServiceCollectsFileCacheInformation() {
internalCluster().startNodes(1);
internalCluster().ensureAtLeastNumSearchAndDataNodes(2);

InternalTestCluster internalTestCluster = internalCluster();
// Get the cluster info service on the cluster-manager node
final InternalClusterInfoService infoService = (InternalClusterInfoService) internalTestCluster.getInstance(
ClusterInfoService.class,
internalTestCluster.getClusterManagerName()
);
infoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
ClusterInfo info = infoService.refresh();
assertNotNull("info should not be null", info);
final Map<String, FileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
assertNotNull(nodeFileCacheStats);
assertThat("file cache is enabled on both search nodes", nodeFileCacheStats.size(), Matchers.equalTo(2));

for (FileCacheStats fileCacheStats : nodeFileCacheStats.values()) {
assertThat("file cache is non empty", fileCacheStats.getTotal().getBytes(), greaterThan(0L));
}
}

public void testClusterInfoServiceInformationClearOnError() {
internalCluster().startNodes(
2,
Expand Down
24 changes: 22 additions & 2 deletions server/src/main/java/org/opensearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.opensearch.Version;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand All @@ -42,6 +43,7 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.remote.filecache.FileCacheStats;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -63,9 +65,10 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
public static final ClusterInfo EMPTY = new ClusterInfo();
final Map<ShardRouting, String> routingToDataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, FileCacheStats> nodeFileCacheStats;

protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
}

/**
Expand All @@ -83,13 +86,15 @@ public ClusterInfo(
final Map<String, DiskUsage> mostAvailableSpaceUsage,
final Map<String, Long> shardSizes,
final Map<ShardRouting, String> routingToDataPath,
final Map<NodeAndPath, ReservedSpace> reservedSpace
final Map<NodeAndPath, ReservedSpace> reservedSpace,
final Map<String, FileCacheStats> nodeFileCacheStats
) {
this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
this.shardSizes = shardSizes;
this.mostAvailableSpaceUsage = mostAvailableSpaceUsage;
this.routingToDataPath = routingToDataPath;
this.reservedSpace = reservedSpace;
this.nodeFileCacheStats = nodeFileCacheStats;
}

public ClusterInfo(StreamInput in) throws IOException {
Expand All @@ -105,6 +110,11 @@ public ClusterInfo(StreamInput in) throws IOException {
this.shardSizes = Collections.unmodifiableMap(sizeMap);
this.routingToDataPath = Collections.unmodifiableMap(routingMap);
this.reservedSpace = Collections.unmodifiableMap(reservedSpaceMap);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.nodeFileCacheStats = in.readMap(StreamInput::readString, FileCacheStats::new);
} else {
this.nodeFileCacheStats = Map.of();
}
}

@Override
Expand All @@ -114,6 +124,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(this.shardSizes, StreamOutput::writeString, (o, v) -> out.writeLong(v == null ? -1 : v));
out.writeMap(this.routingToDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString);
out.writeMap(this.reservedSpace, (o, v) -> v.writeTo(o), (o, v) -> v.writeTo(o));
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeMap(this.nodeFileCacheStats, StreamOutput::writeString, (o, v) -> v.writeTo(o));
}
}

public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Expand Down Expand Up @@ -187,6 +200,13 @@ public Map<String, DiskUsage> getNodeMostAvailableDiskUsages() {
return Collections.unmodifiableMap(this.mostAvailableSpaceUsage);
}

/**
* Returns a node id to file cache stats mapping for the nodes that have search roles assigned to it.
*/
public Map<String, FileCacheStats> getNodeFileCacheStats() {
return Collections.unmodifiableMap(this.nodeFileCacheStats);
}

/**
* Returns the shard size for the given shard routing or <code>null</code> it that metric is not available.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.store.StoreStats;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ReceiveTimeoutTransportException;
Expand All @@ -72,6 +73,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
* InternalClusterInfoService provides the ClusterInfoService interface,
Expand Down Expand Up @@ -110,6 +112,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt

private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
private volatile Map<String, FileCacheStats> nodeFileCacheStats;
private volatile IndicesStatsSummary indicesStatsSummary;
// null if this node is not currently the cluster-manager
private final AtomicReference<RefreshAndRescheduleRunnable> refreshAndRescheduleRunnable = new AtomicReference<>();
Expand All @@ -122,6 +125,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) {
this.leastAvailableSpaceUsages = Map.of();
this.mostAvailableSpaceUsages = Map.of();
this.nodeFileCacheStats = Map.of();
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
this.threadPool = threadPool;
this.client = client;
Expand Down Expand Up @@ -208,7 +212,8 @@ public ClusterInfo getClusterInfo() {
mostAvailableSpaceUsages,
indicesStatsSummary.shardSizes,
indicesStatsSummary.shardRoutingToDataPath,
indicesStatsSummary.reservedSpace
indicesStatsSummary.reservedSpace,
nodeFileCacheStats
);
}

Expand All @@ -221,6 +226,7 @@ protected CountDownLatch updateNodeStats(final ActionListener<NodesStatsResponse
final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true");
nodesStatsRequest.clear();
nodesStatsRequest.addMetric(NodesStatsRequest.Metric.FS.metricName());
nodesStatsRequest.addMetric(NodesStatsRequest.Metric.FILE_CACHE_STATS.metricName());
nodesStatsRequest.timeout(fetchTimeout);
client.admin().cluster().nodesStats(nodesStatsRequest, new LatchedActionListener<>(listener, latch));
return latch;
Expand Down Expand Up @@ -264,6 +270,13 @@ public void onResponse(NodesStatsResponse nodesStatsResponse) {
);
leastAvailableSpaceUsages = Collections.unmodifiableMap(leastAvailableUsagesBuilder);
mostAvailableSpaceUsages = Collections.unmodifiableMap(mostAvailableUsagesBuilder);

nodeFileCacheStats = Collections.unmodifiableMap(
nodesStatsResponse.getNodes()
.stream()
.filter(nodeStats -> nodeStats.getNode().isSearchNode())
.collect(Collectors.toMap(nodeStats -> nodeStats.getNode().getId(), NodeStats::getFileCacheStats))
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,21 @@
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.index.Index;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.snapshots.SnapshotShardSizeInfo;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE;
import static org.opensearch.cluster.routing.RoutingPool.getNodePool;
import static org.opensearch.cluster.routing.RoutingPool.getShardPool;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO;

/**
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
Expand Down Expand Up @@ -167,6 +174,42 @@ public static long sizeOfRelocatingShards(
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
ClusterInfo clusterInfo = allocation.clusterInfo();

/*
The following block enables allocation for remote shards within safeguard limits of the filecache.
*/
if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
kotwanikunal marked this conversation as resolved.
Show resolved Hide resolved
final List<ShardRouting> remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false)
.filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getShardPool(shard, allocation)))
.collect(Collectors.toList());
final long currentNodeRemoteShardSize = remoteShardsOnNode.stream()
.map(ShardRouting::getExpectedShardSize)
.mapToLong(Long::longValue)
.sum();

final long shardSize = getExpectedShardSize(
shardRouting,
0L,
allocation.clusterInfo(),
allocation.snapshotShardSizeInfo(),
allocation.metadata(),
allocation.routingTable()
);

final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
final long totalNodeRemoteShardSize = currentNodeRemoteShardSize + shardSize;

if (totalNodeRemoteShardSize > DATA_TO_FILE_CACHE_SIZE_RATIO * nodeCacheSize) {
kotwanikunal marked this conversation as resolved.
Show resolved Hide resolved
return allocation.decision(
Decision.NO,
NAME,
"file cache limit reached - remote shard size will exceed configured safeguard ratio"
);
}
return Decision.YES;
kotwanikunal marked this conversation as resolved.
Show resolved Hide resolved
}

Map<String, DiskUsage> usages = clusterInfo.getNodeMostAvailableDiskUsages();
final Decision decision = earlyTerminate(allocation, usages);
if (decision != null) {
Expand Down Expand Up @@ -422,6 +465,15 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
if (shardRouting.currentNodeId().equals(node.nodeId()) == false) {
throw new IllegalArgumentException("Shard [" + shardRouting + "] is not allocated on node: [" + node.nodeId() + "]");
}

/*
The following block prevents movement for remote shards since they do not use the local storage as
the primary source of data storage.
*/
if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
return Decision.ALWAYS;
}

final ClusterInfo clusterInfo = allocation.clusterInfo();
final Map<String, DiskUsage> usages = clusterInfo.getNodeLeastAvailableDiskUsages();
final Decision decision = earlyTerminate(allocation, usages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class FileCache implements RefCountedCache<Path, CachedIndexInput> {

private final CircuitBreaker circuitBreaker;

// TODO: Convert the constant into an integer setting
public static final int DATA_TO_FILE_CACHE_SIZE_RATIO = 5;

public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
this.theCache = cache;
this.circuitBreaker = circuitBreaker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.cluster.routing.TestShardRouting;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.test.OpenSearchTestCase;

import java.util.HashMap;
Expand All @@ -49,7 +50,8 @@ public void testSerialization() throws Exception {
randomDiskUsage(),
randomShardSizes(),
randomRoutingToDataPath(),
randomReservedSpace()
randomReservedSpace(),
randomFileCacheStats()
);
BytesStreamOutput output = new BytesStreamOutput();
clusterInfo.writeTo(output);
Expand All @@ -60,6 +62,7 @@ public void testSerialization() throws Exception {
assertEquals(clusterInfo.shardSizes, result.shardSizes);
assertEquals(clusterInfo.routingToDataPath, result.routingToDataPath);
assertEquals(clusterInfo.reservedSpace, result.reservedSpace);
assertEquals(clusterInfo.getNodeFileCacheStats().size(), result.getNodeFileCacheStats().size());
}

private static Map<String, DiskUsage> randomDiskUsage() {
Expand All @@ -79,6 +82,25 @@ private static Map<String, DiskUsage> randomDiskUsage() {
return builder;
}

private static Map<String, FileCacheStats> randomFileCacheStats() {
int numEntries = randomIntBetween(0, 16);
final Map<String, FileCacheStats> builder = new HashMap<>(numEntries);
for (int i = 0; i < numEntries; i++) {
String key = randomAlphaOfLength(16);
FileCacheStats fileCacheStats = new FileCacheStats(
randomLong(),
randomLong(),
randomLong(),
randomLong(),
randomLong(),
randomLong(),
randomLong()
);
builder.put(key, fileCacheStats);
}
return builder;
}

private static Map<String, Long> randomShardSizes() {
int numEntries = randomIntBetween(0, 128);
final Map<String, Long> builder = new HashMap<>(numEntries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ private static ClusterInfo clusterInfo(
final Map<String, DiskUsage> diskUsages,
final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpace
) {
return new ClusterInfo(diskUsages, null, null, null, reservedSpace);
return new ClusterInfo(diskUsages, null, null, null, reservedSpace, Map.of());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public DevNullClusterInfo(
final Map<String, Long> shardSizes,
final Map<NodeAndPath, ReservedSpace> reservedSpace
) {
super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace);
super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace, Map.of());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public DevNullClusterInfo(
final Map<String, DiskUsage> mostAvailableSpaceUsage,
final Map<String, Long> shardSizes
) {
super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, Map.of());
super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, Map.of(), Map.of());
}

@Override
Expand Down
Loading
Loading