From 32cba615dbf0dc132f8fb6cc79faa8f16c373b0b Mon Sep 17 00:00:00 2001 From: Kunal Kotwani Date: Thu, 16 Feb 2023 15:11:00 -0800 Subject: [PATCH] Add cache reservation logic Signed-off-by: Kunal Kotwani --- .../cluster/node/DiscoveryNode.java | 4 + .../common/settings/ClusterSettings.java | 1 + .../org/opensearch/env/NodeEnvironment.java | 94 +++++++++++++++- .../org/opensearch/index/IndexService.java | 105 ++++++++++-------- .../org/opensearch/index/shard/ShardPath.java | 10 ++ .../org/opensearch/monitor/fs/FsInfo.java | 18 +++ .../org/opensearch/monitor/fs/FsProbe.java | 14 +++ .../org/opensearch/monitor/fs/FsService.java | 2 +- .../main/java/org/opensearch/node/Node.java | 26 +++-- .../cluster/node/DiscoveryNodeTests.java | 24 +++- .../index/shard/ShardPathTests.java | 23 +++- .../opensearch/monitor/fs/FsProbeTests.java | 35 ++++++ .../java/org/opensearch/test/NodeRoles.java | 16 +++ 13 files changed, 306 insertions(+), 66 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index 6c66c14241499..37b4e1686d1de 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -121,6 +121,10 @@ public static boolean isRemoteClusterClient(final Settings settings) { return hasRole(settings, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE); } + public static boolean isSearchNode(Settings settings) { + return hasRole(settings, DiscoveryNodeRole.SEARCH_ROLE); + } + private final String nodeName; private final String nodeId; private final String ephemeralId; diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 37fd8d1bb594a..5de14e2ebbf43 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -490,6 +490,7 @@ public void apply(Settings value, Settings current, Settings previous) { Node.NODE_NAME_SETTING, Node.NODE_ATTRIBUTES, Node.NODE_LOCAL_STORAGE_SETTING, + Node.NODE_SEARCH_CACHE_SIZE_SETTING, NodeRoleSettings.NODE_ROLES_SETTING, AutoCreateIndex.AUTO_CREATE_INDEX_SETTING, BaseRestHandler.MULTI_ALLOW_EXPLICIT_INDEX, diff --git a/server/src/main/java/org/opensearch/env/NodeEnvironment.java b/server/src/main/java/org/opensearch/env/NodeEnvironment.java index a29e088b2df8d..f028e7d4ed894 100644 --- a/server/src/main/java/org/opensearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/opensearch/env/NodeEnvironment.java @@ -44,6 +44,7 @@ import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.store.NativeFSLockFactory; +import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.cluster.metadata.IndexMetadata; @@ -59,6 +60,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.NamedXContentRegistry; @@ -104,6 +106,7 @@ import java.util.stream.Stream; import static java.util.Collections.unmodifiableSet; +import static org.opensearch.node.Node.NODE_SEARCH_CACHE_SIZE_SETTING; /** * A component that holds all data paths for a single node. @@ -123,14 +126,20 @@ public static class NodePath { public final Path indicesPath; /** Cached FileStore from path */ public final FileStore fileStore; - + public final Path cachePath; + /* + Cache reserved size can default to a different value depending on configuration + */ + public ByteSizeValue cacheReservedSize; public final int majorDeviceNumber; public final int minorDeviceNumber; public NodePath(Path path) throws IOException { this.path = path; this.indicesPath = path.resolve(INDICES_FOLDER); + this.cachePath = path.resolve(CACHE_FOLDER); this.fileStore = Environment.getFileStore(path); + this.cacheReservedSize = ByteSizeValue.ZERO; if (fileStore.supportsFileAttributeView("lucene")) { this.majorDeviceNumber = (int) fileStore.getAttribute("lucene:major_device_number"); this.minorDeviceNumber = (int) fileStore.getAttribute("lucene:minor_device_number"); @@ -180,6 +189,7 @@ public String toString() { private final Logger logger = LogManager.getLogger(NodeEnvironment.class); private final NodePath[] nodePaths; + private final NodePath cacheNodePath; private final Path sharedDataPath; private final Lock[] locks; @@ -215,8 +225,11 @@ public String toString() { Property.NodeScope ); + private static final ByteSizeValue SEARCH_CACHE_DEFAULT_SIZE = new ByteSizeValue(100, ByteSizeUnit.MB); + public static final String NODES_FOLDER = "nodes"; public static final String INDICES_FOLDER = "indices"; + public static final String CACHE_FOLDER = "cache"; public static final String NODE_LOCK_FILENAME = "node.lock"; /** @@ -291,6 +304,7 @@ public void close() { public NodeEnvironment(Settings settings, Environment environment) throws IOException { if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) { nodePaths = null; + cacheNodePath = null; sharedDataPath = null; locks = null; nodeLockId = -1; @@ -342,6 +356,25 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce } this.locks = nodeLock.locks; this.nodePaths = nodeLock.nodePaths; + this.cacheNodePath = nodePaths[0]; + + /* + The following block initializes the search cache based on user configuration. + If the user doesn't configure the cache size, we default to 100 MB since we still + need to cache files for index querying. + */ + if (DiscoveryNode.isSearchNode(settings)) { + long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes(); + + if (capacity == 0) { + FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(this.cacheNodePath)); + long availableCapacity = info.getTotal().getBytes(); + capacity = Math.min(availableCapacity, SEARCH_CACHE_DEFAULT_SIZE.getBytes()); + } + + cacheNodePath.cacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES); + } + this.nodeLockId = nodeLock.nodeId; if (logger.isDebugEnabled()) { @@ -366,6 +399,10 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce ensureNoShardData(nodePaths); } + if (DiscoveryNode.isSearchNode(settings) == false) { + ensureNoCacheData(cacheNodePath); + } + this.nodeMetadata = loadNodeMetadata(settings, logger, nodePaths); success = true; } finally { @@ -888,6 +925,17 @@ public NodePath[] nodePaths() { return nodePaths; } + /** + * Returns the {@link NodePath} used for caching. + */ + public NodePath cacheNodePath() { + assertEnvIsLocked(); + if (nodePaths == null || locks == null) { + throw new IllegalStateException("node is not configured to store local location"); + } + return cacheNodePath; + } + public int getNodeLockId() { assertEnvIsLocked(); if (nodePaths == null || locks == null) { @@ -1143,6 +1191,22 @@ private void ensureNoShardData(final NodePath[] nodePaths) throws IOException { } } + /** + * Throws an exception if cache exists on a non-search node. + */ + private void ensureNoCacheData(final NodePath cacheNodePath) throws IOException { + List cacheDataPaths = collectCacheDataPath(cacheNodePath); + if (cacheDataPaths.isEmpty() == false) { + final String message = String.format( + Locale.ROOT, + "node does not have the %s role but has shard data: %s. Use 'opensearch-node repurpose' tool to clean up", + DiscoveryNodeRole.SEARCH_ROLE.roleName(), + cacheDataPaths + ); + throw new IllegalStateException(message); + } + } + private void ensureNoIndexMetadata(final NodePath[] nodePaths) throws IOException { List indexMetadataPaths = collectIndexMetadataPaths(nodePaths); if (indexMetadataPaths.isEmpty() == false) { @@ -1200,6 +1264,34 @@ private static boolean isIndexMetadataPath(Path path) { return Files.isDirectory(path) && path.getFileName().toString().equals(MetadataStateFormat.STATE_DIR_NAME); } + /** + * Collect the path containing cache data in the indicated cache node path. + * The returned paths will point to the shard data folder. + */ + static List collectCacheDataPath(NodePath cacheNodePath) throws IOException { + List indexSubPaths = new ArrayList<>(); + Path cachePath = cacheNodePath.cachePath; + if (Files.isDirectory(cachePath)) { + try (DirectoryStream nodeStream = Files.newDirectoryStream(cachePath)) { + for (Path nodePath : nodeStream) { + if (Files.isDirectory(nodePath)) { + try (DirectoryStream indexStream = Files.newDirectoryStream(nodePath)) { + for (Path indexPath : indexStream) { + if (Files.isDirectory(indexPath)) { + try (Stream shardStream = Files.list(indexPath)) { + shardStream.map(Path::toAbsolutePath).forEach(indexSubPaths::add); + } + } + } + } + } + } + } + } + + return indexSubPaths; + } + /** * Resolve the custom path for a index's shard. */ diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 78211f12f71ad..6589297db1543 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -458,53 +458,9 @@ public synchronized IndexShard createShard( try { lock = nodeEnv.shardLock(shardId, "starting shard", TimeUnit.SECONDS.toMillis(5)); eventListener.beforeIndexShardCreated(shardId, indexSettings); - ShardPath path; - try { - path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath()); - } catch (IllegalStateException ex) { - logger.warn("{} failed to load shard path, trying to remove leftover", shardId); - try { - ShardPath.deleteLeftoverShardDirectory(logger, nodeEnv, lock, this.indexSettings); - path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath()); - } catch (Exception inner) { - ex.addSuppressed(inner); - throw ex; - } - } - - if (path == null) { - // TODO: we should, instead, hold a "bytes reserved" of how large we anticipate this shard will be, e.g. for a shard - // that's being relocated/replicated we know how large it will become once it's done copying: - // Count up how many shards are currently on each data path: - Map dataPathToShardCount = new HashMap<>(); - for (IndexShard shard : this) { - Path dataPath = shard.shardPath().getRootStatePath(); - Integer curCount = dataPathToShardCount.get(dataPath); - if (curCount == null) { - curCount = 0; - } - dataPathToShardCount.put(dataPath, curCount + 1); - } - path = ShardPath.selectNewPathForShard( - nodeEnv, - shardId, - this.indexSettings, - routing.getExpectedShardSize() == ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE - ? getAvgShardSizeInBytes() - : routing.getExpectedShardSize(), - dataPathToShardCount - ); - logger.debug("{} creating using a new path [{}]", shardId, path); - } else { - logger.debug("{} creating using an existing path [{}]", shardId, path); - } - - if (shards.containsKey(shardId.id())) { - throw new IllegalStateException(shardId + " already exists"); - } - + ShardPath path = getShardPath(routing, shardId, lock); logger.debug("creating shard_id {}", shardId); - // if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary. + // if we are on a shared FS we only own the shard (i.e. we can safely delete it) if we are the primary. final Engine.Warmer engineWarmer = (reader) -> { IndexShard shard = getShardOrNull(shardId.getId()); if (shard != null) { @@ -573,6 +529,63 @@ public synchronized IndexShard createShard( } } + /* + Fetches the shard path based on the index type - + For a remote snapshot index, the cache path is used to initialize the shards. + For a local index, a local shard path is loaded or a new path is calculated. + */ + private ShardPath getShardPath(ShardRouting routing, ShardId shardId, ShardLock lock) throws IOException { + ShardPath path; + if (this.indexSettings.isRemoteSnapshot()) { + path = ShardPath.loadCachePath(nodeEnv, shardId); + } else { + try { + path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath()); + } catch (IllegalStateException ex) { + logger.warn("{} failed to load shard path, trying to remove leftover", shardId); + try { + ShardPath.deleteLeftoverShardDirectory(logger, nodeEnv, lock, this.indexSettings); + path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath()); + } catch (Exception inner) { + ex.addSuppressed(inner); + throw ex; + } + } + + if (path == null) { + // TODO: we should, instead, hold a "bytes reserved" of how large we anticipate this shard will be, e.g. for a shard + // that's being relocated/replicated we know how large it will become once it's done copying: + // Count up how many shards are currently on each data path: + Map dataPathToShardCount = new HashMap<>(); + for (IndexShard shard : this) { + Path dataPath = shard.shardPath().getRootStatePath(); + Integer curCount = dataPathToShardCount.get(dataPath); + if (curCount == null) { + curCount = 0; + } + dataPathToShardCount.put(dataPath, curCount + 1); + } + path = ShardPath.selectNewPathForShard( + nodeEnv, + shardId, + this.indexSettings, + routing.getExpectedShardSize() == ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE + ? getAvgShardSizeInBytes() + : routing.getExpectedShardSize(), + dataPathToShardCount + ); + logger.debug("{} creating using a new path [{}]", shardId, path); + } else { + logger.debug("{} creating using an existing path [{}]", shardId, path); + } + } + + if (shards.containsKey(shardId.id())) { + throw new IllegalStateException(shardId + " already exists"); + } + return path; + } + @Override public synchronized void removeShard(int shardId, String reason) { final ShardId sId = new ShardId(index(), shardId); diff --git a/server/src/main/java/org/opensearch/index/shard/ShardPath.java b/server/src/main/java/org/opensearch/index/shard/ShardPath.java index 9960326e914e4..9c61bab47b815 100644 --- a/server/src/main/java/org/opensearch/index/shard/ShardPath.java +++ b/server/src/main/java/org/opensearch/index/shard/ShardPath.java @@ -130,6 +130,16 @@ public boolean isCustomDataPath() { return isCustomDataPath; } + /** + * Returns the shard path to be stored within the cache on the search capable node. + */ + public static ShardPath loadCachePath(NodeEnvironment env, ShardId shardId) { + NodeEnvironment.NodePath path = env.cacheNodePath(); + final Path dataPath = env.resolveCustomLocation(path.cachePath.toString(), shardId); + final Path statePath = path.resolve(shardId); + return new ShardPath(true, dataPath, statePath, shardId); + } + /** * This method walks through the nodes shard paths to find the data and state path for the given shard. If multiple * directories with a valid shard state exist the one with the highest version will be used. diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java index 87a88c11f74e5..0908faa5bde95 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java @@ -32,6 +32,7 @@ package org.opensearch.monitor.fs; +import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -70,6 +71,7 @@ public static class Path implements Writeable, ToXContentObject { long total = -1; long free = -1; long available = -1; + long cacheReserved = -1; public Path() {} @@ -91,6 +93,9 @@ public Path(StreamInput in) throws IOException { total = in.readLong(); free = in.readLong(); available = in.readLong(); + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + cacheReserved = in.readLong(); + } } @Override @@ -101,6 +106,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(total); out.writeLong(free); out.writeLong(available); + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeLong(cacheReserved); + } } public String getPath() { @@ -127,6 +135,10 @@ public ByteSizeValue getAvailable() { return new ByteSizeValue(available); } + public ByteSizeValue getReserved() { + return new ByteSizeValue(cacheReserved); + } + private long addLong(long current, long other) { if (current == -1 && other == -1) { return 0; @@ -143,6 +155,7 @@ private long addLong(long current, long other) { public void add(Path path) { total = FsProbe.adjustForHugeFilesystems(addLong(total, path.total)); free = FsProbe.adjustForHugeFilesystems(addLong(free, path.free)); + cacheReserved = FsProbe.adjustForHugeFilesystems(addLong(cacheReserved, path.cacheReserved)); available = FsProbe.adjustForHugeFilesystems(addLong(available, path.available)); } @@ -156,6 +169,8 @@ static final class Fields { static final String FREE_IN_BYTES = "free_in_bytes"; static final String AVAILABLE = "available"; static final String AVAILABLE_IN_BYTES = "available_in_bytes"; + static final String CACHE_RESERVED = "cache_reserved"; + static final String CACHE_RESERVED_IN_BYTES = "cache_reserved_in_bytes"; } @Override @@ -180,6 +195,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (available != -1) { builder.humanReadableField(Fields.AVAILABLE_IN_BYTES, Fields.AVAILABLE, getAvailable()); } + if (cacheReserved != -1) { + builder.humanReadableField(Fields.CACHE_RESERVED_IN_BYTES, Fields.CACHE_RESERVED, getReserved()); + } builder.endObject(); return builder; diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java index a196a449fa10a..a377f89ab3304 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java @@ -36,9 +36,12 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.Constants; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.PathUtils; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.env.NodeEnvironment; import org.opensearch.env.NodeEnvironment.NodePath; @@ -62,8 +65,15 @@ public class FsProbe { private final NodeEnvironment nodeEnv; + private final Settings settings; + public FsProbe(NodeEnvironment nodeEnv) { + this(nodeEnv, null); + } + + public FsProbe(NodeEnvironment nodeEnv, Settings settings) { this.nodeEnv = nodeEnv; + this.settings = settings; } public FsInfo stats(FsInfo previous) throws IOException { @@ -74,6 +84,10 @@ public FsInfo stats(FsInfo previous) throws IOException { FsInfo.Path[] paths = new FsInfo.Path[dataLocations.length]; for (int i = 0; i < dataLocations.length; i++) { paths[i] = getFSInfo(dataLocations[i]); + if (settings != null && DiscoveryNode.isSearchNode(settings) && dataLocations[i].cacheReservedSize != ByteSizeValue.ZERO) { + paths[i].cacheReserved = dataLocations[i].cacheReservedSize.getBytes(); + paths[i].available -= paths[i].cacheReserved; + } } FsInfo.IoStats ioStats = null; if (Constants.LINUX) { diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsService.java b/server/src/main/java/org/opensearch/monitor/fs/FsService.java index 728a6d7f0b36d..f0cd1eb94c73b 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsService.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsService.java @@ -70,7 +70,7 @@ public class FsService { ); public FsService(final Settings settings, final NodeEnvironment nodeEnvironment) { - final FsProbe probe = new FsProbe(nodeEnvironment); + final FsProbe probe = new FsProbe(nodeEnvironment, settings); final FsInfo initialValue = stats(probe, null); if (ALWAYS_REFRESH_SETTING.get(settings)) { assert REFRESH_INTERVAL_SETTING.exists(settings) == false; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index a9574307500aa..dd2c46979c378 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.Constants; import org.opensearch.common.SetOnce; +import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.util.FeatureFlags; import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; import org.opensearch.index.IndexModule; @@ -47,8 +48,6 @@ import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.extensions.ExtensionsManager; import org.opensearch.extensions.NoopExtensionsManager; -import org.opensearch.monitor.fs.FsInfo; -import org.opensearch.monitor.fs.FsProbe; import org.opensearch.search.backpressure.SearchBackpressureService; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.tasks.TaskResourceTrackingService; @@ -60,7 +59,6 @@ import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchTimeoutException; import org.opensearch.Version; -import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionModule; import org.opensearch.action.ActionType; import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; @@ -316,6 +314,12 @@ public class Node implements Closeable { } }, Setting.Property.NodeScope); + public static final Setting NODE_SEARCH_CACHE_SIZE_SETTING = Setting.byteSizeSetting( + "node.search.cache.size", + ByteSizeValue.ZERO, + Property.NodeScope + ); + private static final String CLIENT_TYPE = "node"; /** @@ -625,8 +629,12 @@ protected Node( final Collection>> engineFactoryProviders = enginePlugins.stream() .map(plugin -> (Function>) plugin::getEngineFactory) .collect(Collectors.toList()); - // TODO: for now this is a single cache, later, this should read node and index settings - final FileCache remoteStoreFileCache = createRemoteStoreFileCache(); + + FileCache remoteStoreFileCache = null; + if (DiscoveryNode.isSearchNode(settings)) { + remoteStoreFileCache = createRemoteStoreFileCache(); + } + final Map builtInDirectoryFactories = IndexModule.createBuiltInDirectoryFactories( repositoriesServiceReference::get, threadPool, @@ -1128,13 +1136,7 @@ protected Node( } private FileCache createRemoteStoreFileCache() { - // TODO: implement more custom logic to create named caches, using multiple node paths, more capacity computation options and - // capacity reservation logic - FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(nodeEnvironment.nodePaths()[0])); - long diskCapacity = info.getTotal().getBytes(); - // hard coded as 50% for now - long capacity = (long) (diskCapacity * 0.50); - return FileCacheFactory.createConcurrentLRUFileCache(capacity); + return FileCacheFactory.createConcurrentLRUFileCache(nodeEnvironment.cacheNodePath().cacheReservedSize.getBytes()); } protected TransportService newTransportService( diff --git a/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeTests.java b/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeTests.java index 4f16e7526c9ea..0232f671f86de 100644 --- a/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeTests.java +++ b/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeTests.java @@ -50,12 +50,14 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; -import static org.opensearch.test.NodeRoles.nonRemoteClusterClientNode; -import static org.opensearch.test.NodeRoles.remoteClusterClientNode; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.not; +import static org.opensearch.test.NodeRoles.nonRemoteClusterClientNode; +import static org.opensearch.test.NodeRoles.remoteClusterClientNode; +import static org.opensearch.test.NodeRoles.searchNode; +import static org.opensearch.test.NodeRoles.nonSearchNode; public class DiscoveryNodeTests extends OpenSearchTestCase { @@ -175,6 +177,14 @@ public void testDiscoveryNodeIsRemoteClusterClientUnset() { runTestDiscoveryNodeIsRemoteClusterClient(nonRemoteClusterClientNode(), false); } + public void testDiscoveryNodeIsSearchSet() { + runTestDiscoveryNodeIsSearch(searchNode(), true); + } + + public void testDiscoveryNodeIsSearchUnset() { + runTestDiscoveryNodeIsSearch(nonSearchNode(), false); + } + // Added in 2.0 temporarily, validate the MASTER_ROLE is in the list of known roles. // MASTER_ROLE was removed from BUILT_IN_ROLES and is imported by setDeprecatedMasterRole(), // as a workaround for making the new CLUSTER_MANAGER_ROLE has got the same abbreviation 'm'. @@ -194,6 +204,16 @@ private void runTestDiscoveryNodeIsRemoteClusterClient(final Settings settings, } } + private void runTestDiscoveryNodeIsSearch(final Settings settings, final boolean expected) { + final DiscoveryNode node = DiscoveryNode.createLocal(settings, new TransportAddress(TransportAddress.META_ADDRESS, 9200), "node"); + assertThat(node.isSearchNode(), equalTo(expected)); + if (expected) { + assertThat(node.getRoles(), hasItem(DiscoveryNodeRole.SEARCH_ROLE)); + } else { + assertThat(node.getRoles(), not(hasItem(DiscoveryNodeRole.SEARCH_ROLE))); + } + } + public void testGetRoleFromRoleNameIsCaseInsensitive() { String dataRoleName = "DATA"; DiscoveryNodeRole dataNodeRole = DiscoveryNode.getRoleFromRoleName(dataRoleName); diff --git a/server/src/test/java/org/opensearch/index/shard/ShardPathTests.java b/server/src/test/java/org/opensearch/index/shard/ShardPathTests.java index 25ec7c7987855..4102cbc32c9f7 100644 --- a/server/src/test/java/org/opensearch/index/shard/ShardPathTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ShardPathTests.java @@ -33,7 +33,6 @@ import org.opensearch.cluster.routing.AllocationId; import org.opensearch.common.settings.Settings; -import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.gateway.WriteStateException; import org.opensearch.index.Index; @@ -44,6 +43,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; +import static org.opensearch.env.Environment.PATH_SHARED_DATA_SETTING; public class ShardPathTests extends OpenSearchTestCase { public void testLoadShardPath() throws IOException { @@ -109,9 +109,7 @@ public void testGetRootPaths() throws IOException { if (useCustomDataPath) { final Path path = createTempDir(); customDataPath = "custom"; - nodeSettings = Settings.builder() - .put(Environment.PATH_SHARED_DATA_SETTING.getKey(), path.toAbsolutePath().toAbsolutePath()) - .build(); + nodeSettings = Settings.builder().put(PATH_SHARED_DATA_SETTING.getKey(), path.toAbsolutePath().toAbsolutePath()).build(); customPath = path.resolve("custom").resolve("0"); } else { customPath = null; @@ -149,6 +147,23 @@ public void testGetRootPaths() throws IOException { } } + public void testLoadCachePath() throws IOException { + Settings searchNodeSettings = Settings.builder().put("node.roles", "search").put(PATH_SHARED_DATA_SETTING.getKey(), "").build(); + + try (NodeEnvironment env = newNodeEnvironment(searchNodeSettings)) { + ShardId shardId = new ShardId("foo", "0xDEADBEEF", 0); + Path path = env.cacheNodePath().cachePath; + writeShardStateMetadata("0xDEADBEEF", path); + ShardPath shardPath = ShardPath.loadCachePath(env, shardId); + + assertTrue(shardPath.getDataPath().startsWith(path)); + assertFalse(shardPath.getShardStatePath().startsWith(path)); + + assertEquals("0xDEADBEEF", shardPath.getShardId().getIndex().getUUID()); + assertEquals("foo", shardPath.getShardId().getIndexName()); + } + } + private static void writeShardStateMetadata(String indexUUID, Path... paths) throws WriteStateException { ShardStateMetadata.FORMAT.writeAndCleanup( new ShardStateMetadata(true, indexUUID, AllocationId.newInitializing(), ShardStateMetadata.IndexDataLocation.LOCAL), diff --git a/server/src/test/java/org/opensearch/monitor/fs/FsProbeTests.java b/server/src/test/java/org/opensearch/monitor/fs/FsProbeTests.java index b70b76c23fb96..472b631104853 100644 --- a/server/src/test/java/org/opensearch/monitor/fs/FsProbeTests.java +++ b/server/src/test/java/org/opensearch/monitor/fs/FsProbeTests.java @@ -34,6 +34,7 @@ import org.apache.lucene.util.Constants; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; import org.opensearch.env.NodeEnvironment; import org.opensearch.env.NodeEnvironment.NodePath; import org.opensearch.test.OpenSearchTestCase; @@ -107,6 +108,40 @@ public void testFsInfo() throws IOException { } } + public void testFsCacheInfo() throws IOException { + + try (NodeEnvironment env = newNodeEnvironment()) { + + Settings settings = Settings.builder().put("node.roles", "search").build(); + FsProbe probe = new FsProbe(env, settings); + + FsInfo stats = probe.stats(null); + assertNotNull(stats); + assertTrue(stats.getTimestamp() > 0L); + FsInfo.Path total = stats.getTotal(); + assertNotNull(total); + assertTrue(total.total > 0L); + assertTrue(total.free > 0L); + assertTrue(total.available > 0L); + assertTrue(total.cacheReserved > 0L); + assertEquals(total.free - total.available, total.cacheReserved); + + for (FsInfo.Path path : stats) { + assertNotNull(path); + assertFalse(path.getPath().isEmpty()); + assertFalse(path.getMount().isEmpty()); + assertFalse(path.getType().isEmpty()); + assertTrue(path.total > 0L); + assertTrue(path.free > 0L); + assertTrue(path.available > 0L); + + if (path.cacheReserved > -1L) { + assertEquals(path.free - path.available, path.cacheReserved); + } + } + } + } + public void testFsInfoOverflow() throws Exception { final FsInfo.Path pathStats = new FsInfo.Path( "/foo/bar", diff --git a/test/framework/src/main/java/org/opensearch/test/NodeRoles.java b/test/framework/src/main/java/org/opensearch/test/NodeRoles.java index 958b6c81def34..4285ac76fc4d4 100644 --- a/test/framework/src/main/java/org/opensearch/test/NodeRoles.java +++ b/test/framework/src/main/java/org/opensearch/test/NodeRoles.java @@ -244,4 +244,20 @@ public static Settings nonRemoteClusterClientNode(final Settings settings) { return removeRoles(settings, Collections.singleton(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)); } + public static Settings searchNode() { + return searchNode(Settings.EMPTY); + } + + public static Settings searchNode(final Settings settings) { + return addRoles(settings, Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE)); + } + + public static Settings nonSearchNode() { + return nonSearchNode(Settings.EMPTY); + } + + public static Settings nonSearchNode(final Settings settings) { + return removeRoles(settings, Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE)); + } + }