Skip to content

Commit

Permalink
Add cache reservation logic
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal committed Feb 24, 2023
1 parent ceb3928 commit 27e8b27
Show file tree
Hide file tree
Showing 19 changed files with 443 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ public static boolean isRemoteClusterClient(final Settings settings) {
return hasRole(settings, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE);
}

public static boolean isSearchNode(Settings settings) {
return hasRole(settings, DiscoveryNodeRole.SEARCH_ROLE);
}

private final String nodeName;
private final String nodeId;
private final String ephemeralId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance;
import org.opensearch.action.search.CreatePitController;
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexingPressure;
Expand Down Expand Up @@ -152,6 +153,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;

Expand Down Expand Up @@ -629,4 +631,14 @@ public void apply(Settings value, Settings current, Settings previous) {

public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.emptyList();

/**
* Map of feature flag name to feature-flagged cluster settings. Once each feature
* is ready for production release, the feature flag can be removed, and the
* setting should be moved to {@link #BUILT_IN_CLUSTER_SETTINGS}.
*/
public static final Map<String, List<Setting>> FEATURE_FLAGGED_CLUSTER_SETTINGS = Map.of(
FeatureFlags.SEARCHABLE_SNAPSHOT,
List.of(Node.NODE_SEARCH_CACHE_SIZE_SETTING)
);

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,15 @@ public SettingsModule(
registerSetting(setting);
}

for (Map.Entry<String, List<Setting>> featureFlaggedSetting : ClusterSettings.FEATURE_FLAGGED_CLUSTER_SETTINGS.entrySet()) {
if (FeatureFlags.isEnabled(featureFlaggedSetting.getKey())) {
featureFlaggedSetting.getValue().forEach(this::registerSetting);
}
}

for (Map.Entry<String, List<Setting>> featureFlaggedSetting : IndexScopedSettings.FEATURE_FLAGGED_INDEX_SETTINGS.entrySet()) {
if (FeatureFlags.isEnabled(featureFlaggedSetting.getKey())) {
featureFlaggedSetting.getValue().forEach(feature -> registerSetting(feature));
featureFlaggedSetting.getValue().forEach(this::registerSetting);
}
}

Expand Down
147 changes: 146 additions & 1 deletion server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.NativeFSLockFactory;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -59,6 +60,8 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand All @@ -70,9 +73,15 @@
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.index.store.remote.utils.cache.stats.CacheStats;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.Node;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -104,6 +113,7 @@
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableSet;
import static org.opensearch.node.Node.NODE_SEARCH_CACHE_SIZE_SETTING;

/**
* A component that holds all data paths for a single node.
Expand All @@ -123,14 +133,20 @@ public static class NodePath {
public final Path indicesPath;
/** Cached FileStore from path */
public final FileStore fileStore;

public final Path cachePath;
/*
Cache reserved size can default to a different value depending on configuration
*/
public ByteSizeValue cacheReservedSize;
public final int majorDeviceNumber;
public final int minorDeviceNumber;

public NodePath(Path path) throws IOException {
this.path = path;
this.indicesPath = path.resolve(INDICES_FOLDER);
this.cachePath = path.resolve(CACHE_FOLDER);
this.fileStore = Environment.getFileStore(path);
this.cacheReservedSize = ByteSizeValue.ZERO;
if (fileStore.supportsFileAttributeView("lucene")) {
this.majorDeviceNumber = (int) fileStore.getAttribute("lucene:major_device_number");
this.minorDeviceNumber = (int) fileStore.getAttribute("lucene:minor_device_number");
Expand Down Expand Up @@ -180,6 +196,7 @@ public String toString() {

private final Logger logger = LogManager.getLogger(NodeEnvironment.class);
private final NodePath[] nodePaths;
private final NodePath cacheNodePath;
private final Path sharedDataPath;
private final Lock[] locks;

Expand All @@ -189,6 +206,8 @@ public String toString() {

private final NodeMetadata nodeMetadata;

private FileCache remoteStoreFileCache;

/**
* Maximum number of data nodes that should run in an environment.
*/
Expand Down Expand Up @@ -217,6 +236,7 @@ public String toString() {

public static final String NODES_FOLDER = "nodes";
public static final String INDICES_FOLDER = "indices";
public static final String CACHE_FOLDER = "cache";
public static final String NODE_LOCK_FILENAME = "node.lock";

/**
Expand Down Expand Up @@ -291,6 +311,7 @@ public void close() {
public NodeEnvironment(Settings settings, Environment environment) throws IOException {
if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
nodePaths = null;
cacheNodePath = null;
sharedDataPath = null;
locks = null;
nodeLockId = -1;
Expand Down Expand Up @@ -342,6 +363,10 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
}
this.locks = nodeLock.locks;
this.nodePaths = nodeLock.nodePaths;
this.cacheNodePath = nodePaths[0];

initializeRemoteStoreFileCache(settings);

this.nodeLockId = nodeLock.nodeId;

if (logger.isDebugEnabled()) {
Expand All @@ -366,6 +391,10 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
ensureNoShardData(nodePaths);
}

if (DiscoveryNode.isSearchNode(settings) == false) {
ensureNoCacheData(cacheNodePath);
}

this.nodeMetadata = loadNodeMetadata(settings, logger, nodePaths);
success = true;
} finally {
Expand All @@ -375,6 +404,37 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
}
}

/**
* Initializes the search cache with a defined capacity.
* The capacity of the cache is based on user configuration for {@link Node#NODE_SEARCH_CACHE_SIZE_SETTING}.
* If the user doesn't configure the cache size, it fails if the node is a data + search node.
* Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined.
*/
private void initializeRemoteStoreFileCache(Settings settings) {
if (DiscoveryNode.isSearchNode(settings)) {
long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes();

if (capacity == 0) {
if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE)) {
throw new SettingsException(
"Unable to initialize the "
+ DiscoveryNodeRole.SEARCH_ROLE.roleName()
+ "-"
+ DiscoveryNodeRole.DATA_ROLE.roleName()
+ " node: Missing value for configuration "
+ NODE_SEARCH_CACHE_SIZE_SETTING.getKey()
);
} else {
FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(this.cacheNodePath));
long availableCapacity = info.getAvailable().getBytes();
capacity = 80 * availableCapacity / 100;
}
}
cacheNodePath.cacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES);
this.remoteStoreFileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity);
}
}

/**
* Resolve a specific nodes/{node.id} path for the specified path and node lock id.
*
Expand Down Expand Up @@ -888,6 +948,17 @@ public NodePath[] nodePaths() {
return nodePaths;
}

/**
* Returns the {@link NodePath} used for caching.
*/
public NodePath cacheNodePath() {
assertEnvIsLocked();
if (nodePaths == null || locks == null) {
throw new IllegalStateException("node is not configured to store local location");
}
return cacheNodePath;
}

public int getNodeLockId() {
assertEnvIsLocked();
if (nodePaths == null || locks == null) {
Expand Down Expand Up @@ -1143,6 +1214,22 @@ private void ensureNoShardData(final NodePath[] nodePaths) throws IOException {
}
}

/**
* Throws an exception if cache exists on a non-search node.
*/
private void ensureNoCacheData(final NodePath cacheNodePath) throws IOException {
List<Path> cacheDataPaths = collectCacheDataPath(cacheNodePath);
if (cacheDataPaths.isEmpty() == false) {
final String message = String.format(
Locale.ROOT,
"node does not have the %s role but has data within node search cache: %s. Use 'opensearch-node repurpose' tool to clean up",
DiscoveryNodeRole.SEARCH_ROLE.roleName(),
cacheDataPaths
);
throw new IllegalStateException(message);
}
}

private void ensureNoIndexMetadata(final NodePath[] nodePaths) throws IOException {
List<Path> indexMetadataPaths = collectIndexMetadataPaths(nodePaths);
if (indexMetadataPaths.isEmpty() == false) {
Expand Down Expand Up @@ -1200,6 +1287,34 @@ private static boolean isIndexMetadataPath(Path path) {
return Files.isDirectory(path) && path.getFileName().toString().equals(MetadataStateFormat.STATE_DIR_NAME);
}

/**
* Collect the path containing cache data in the indicated cache node path.
* The returned paths will point to the shard data folder.
*/
static List<Path> collectCacheDataPath(NodePath cacheNodePath) throws IOException {
List<Path> indexSubPaths = new ArrayList<>();
Path cachePath = cacheNodePath.cachePath;
if (Files.isDirectory(cachePath)) {
try (DirectoryStream<Path> nodeStream = Files.newDirectoryStream(cachePath)) {
for (Path nodePath : nodeStream) {
if (Files.isDirectory(nodePath)) {
try (DirectoryStream<Path> indexStream = Files.newDirectoryStream(nodePath)) {
for (Path indexPath : indexStream) {
if (Files.isDirectory(indexPath)) {
try (Stream<Path> shardStream = Files.list(indexPath)) {
shardStream.map(Path::toAbsolutePath).forEach(indexSubPaths::add);
}
}
}
}
}
}
}
}

return indexSubPaths;
}

/**
* Resolve the custom path for a index's shard.
*/
Expand Down Expand Up @@ -1306,4 +1421,34 @@ private static void tryWriteTempFile(Path path) throws IOException {
}
}
}

/**
* Returns the {@link FileCache} instance for remote search node
*/
public FileCache remoteStoreFileCache() {
return this.remoteStoreFileCache;
}

/**
* Returns the current {@link FileCacheStats} for remote search node
*/
public FileCacheStats remoteStoreFileCacheStats() {
if (remoteStoreFileCache == null) {
return null;
}

CacheStats stats = remoteStoreFileCache.stats();
CacheUsage usage = remoteStoreFileCache.usage();
return new FileCacheStats(
System.currentTimeMillis(),
usage.activeUsage(),
remoteStoreFileCache.capacity(),
usage.usage(),
stats.evictionWeight(),
stats.removeWeight(),
stats.replaceCount(),
stats.hitCount(),
stats.missCount()
);
}
}
Loading

0 comments on commit 27e8b27

Please sign in to comment.