Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cache reservation logic #6350

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -424,14 +424,15 @@ public void testFileCacheStats() throws Exception {
final Client client = client();
final int numNodes = 2;

internalCluster().ensureAtLeastNumSearchNodes(numNodes);
internalCluster().ensureAtLeastNumDataNodes(numNodes);
createIndexWithDocsAndEnsureGreen(1, 100, indexName1);

createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName1);
deleteIndicesAndEnsureGreen(client, indexName1);
assertAllNodesFileCacheEmpty();

internalCluster().ensureAtLeastNumSearchNodes(numNodes);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertNodesFileCacheNonEmpty(numNodes);
}
Expand All @@ -440,20 +441,23 @@ private void assertAllNodesFileCacheEmpty() {
NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
for (NodeStats stats : response.getNodes()) {
FileCacheStats fcstats = stats.getFileCacheStats();
assertNotNull(fcstats);
assertTrue(isFileCacheEmpty(fcstats));
assertNull(fcstats);
}
}

private void assertNodesFileCacheNonEmpty(int numNodes) {
NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
int nonEmptyFileCacheNodes = 0;
for (NodeStats stats : response.getNodes()) {
FileCacheStats fcstats = stats.getFileCacheStats();
assertNotNull(fcstats);
if (!isFileCacheEmpty(fcstats)) {
nonEmptyFileCacheNodes++;
FileCacheStats fcStats = stats.getFileCacheStats();
kotwanikunal marked this conversation as resolved.
Show resolved Hide resolved
if (stats.getNode().isSearchNode()) {
if (!isFileCacheEmpty(fcStats)) {
nonEmptyFileCacheNodes++;
}
} else {
assertNull(fcStats);
}

}
assertEquals(numNodes, nonEmptyFileCacheNodes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ public static boolean isRemoteClusterClient(final Settings settings) {
return hasRole(settings, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE);
}

public static boolean isSearchNode(Settings settings) {
return hasRole(settings, DiscoveryNodeRole.SEARCH_ROLE);
}

private final String nodeName;
private final String nodeId;
private final String ephemeralId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance;
import org.opensearch.action.search.CreatePitController;
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexingPressure;
Expand Down Expand Up @@ -152,6 +153,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;

Expand Down Expand Up @@ -629,4 +631,14 @@ public void apply(Settings value, Settings current, Settings previous) {

public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.emptyList();

/**
* Map of feature flag name to feature-flagged cluster settings. Once each feature
* is ready for production release, the feature flag can be removed, and the
* setting should be moved to {@link #BUILT_IN_CLUSTER_SETTINGS}.
*/
public static final Map<String, List<Setting>> FEATURE_FLAGGED_CLUSTER_SETTINGS = Map.of(
FeatureFlags.SEARCHABLE_SNAPSHOT,
List.of(Node.NODE_SEARCH_CACHE_SIZE_SETTING)
);

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,15 @@ public SettingsModule(
registerSetting(setting);
}

for (Map.Entry<String, List<Setting>> featureFlaggedSetting : ClusterSettings.FEATURE_FLAGGED_CLUSTER_SETTINGS.entrySet()) {
if (FeatureFlags.isEnabled(featureFlaggedSetting.getKey())) {
featureFlaggedSetting.getValue().forEach(this::registerSetting);
}
}

for (Map.Entry<String, List<Setting>> featureFlaggedSetting : IndexScopedSettings.FEATURE_FLAGGED_INDEX_SETTINGS.entrySet()) {
if (FeatureFlags.isEnabled(featureFlaggedSetting.getKey())) {
featureFlaggedSetting.getValue().forEach(feature -> registerSetting(feature));
featureFlaggedSetting.getValue().forEach(this::registerSetting);
}
}

Expand Down
150 changes: 149 additions & 1 deletion server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.NativeFSLockFactory;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -59,6 +60,8 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand All @@ -70,9 +73,15 @@
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.index.store.remote.utils.cache.stats.CacheStats;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.Node;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -104,6 +113,7 @@
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableSet;
import static org.opensearch.node.Node.NODE_SEARCH_CACHE_SIZE_SETTING;

/**
* A component that holds all data paths for a single node.
Expand All @@ -123,14 +133,20 @@ public static class NodePath {
public final Path indicesPath;
/** Cached FileStore from path */
public final FileStore fileStore;

public final Path fileCachePath;
/*
Cache reserved size can default to a different value depending on configuration
*/
public ByteSizeValue fileCacheReservedSize;
public final int majorDeviceNumber;
public final int minorDeviceNumber;

public NodePath(Path path) throws IOException {
this.path = path;
this.indicesPath = path.resolve(INDICES_FOLDER);
this.fileCachePath = path.resolve(CACHE_FOLDER);
this.fileStore = Environment.getFileStore(path);
this.fileCacheReservedSize = ByteSizeValue.ZERO;
if (fileStore.supportsFileAttributeView("lucene")) {
this.majorDeviceNumber = (int) fileStore.getAttribute("lucene:major_device_number");
this.minorDeviceNumber = (int) fileStore.getAttribute("lucene:minor_device_number");
Expand Down Expand Up @@ -180,6 +196,7 @@ public String toString() {

private final Logger logger = LogManager.getLogger(NodeEnvironment.class);
private final NodePath[] nodePaths;
private final NodePath fileCacheNodePath;
private final Path sharedDataPath;
private final Lock[] locks;

Expand All @@ -189,6 +206,8 @@ public String toString() {

private final NodeMetadata nodeMetadata;

private FileCache fileCache;

/**
* Maximum number of data nodes that should run in an environment.
*/
Expand Down Expand Up @@ -217,6 +236,7 @@ public String toString() {

public static final String NODES_FOLDER = "nodes";
public static final String INDICES_FOLDER = "indices";
public static final String CACHE_FOLDER = "cache";
public static final String NODE_LOCK_FILENAME = "node.lock";

/**
Expand Down Expand Up @@ -291,6 +311,7 @@ public void close() {
public NodeEnvironment(Settings settings, Environment environment) throws IOException {
if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
nodePaths = null;
fileCacheNodePath = null;
sharedDataPath = null;
locks = null;
nodeLockId = -1;
Expand Down Expand Up @@ -342,6 +363,10 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
}
this.locks = nodeLock.locks;
this.nodePaths = nodeLock.nodePaths;
this.fileCacheNodePath = nodePaths[0];

initializeFileCache(settings);

this.nodeLockId = nodeLock.nodeId;

if (logger.isDebugEnabled()) {
Expand All @@ -366,6 +391,10 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
ensureNoShardData(nodePaths);
}

if (DiscoveryNode.isSearchNode(settings) == false) {
ensureNoFileCacheData(fileCacheNodePath);
}

this.nodeMetadata = loadNodeMetadata(settings, logger, nodePaths);
success = true;
} finally {
Expand All @@ -375,6 +404,40 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
}
}

/**
* Initializes the search cache with a defined capacity.
* The capacity of the cache is based on user configuration for {@link Node#NODE_SEARCH_CACHE_SIZE_SETTING}.
* If the user doesn't configure the cache size, it fails if the node is a data + search node.
* Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined.
*/
private void initializeFileCache(Settings settings) {
if (DiscoveryNode.isSearchNode(settings)) {
long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes();
FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(this.fileCacheNodePath));
long availableCapacity = info.getAvailable().getBytes();

// Initialize default values for cache if NODE_SEARCH_CACHE_SIZE_SETTING is not set.
if (capacity == 0) {
// If node is not a dedicated search node without configuration, prevent cache initialization
if (DiscoveryNode.getRolesFromSettings(settings).stream().anyMatch(role -> !DiscoveryNodeRole.SEARCH_ROLE.equals(role))) {
throw new SettingsException(
"Unable to initialize the "
+ DiscoveryNodeRole.SEARCH_ROLE.roleName()
+ "-"
+ DiscoveryNodeRole.DATA_ROLE.roleName()
+ " node: Missing value for configuration "
+ NODE_SEARCH_CACHE_SIZE_SETTING.getKey()
);
} else {
capacity = 80 * availableCapacity / 100;
}
}
capacity = Math.min(capacity, availableCapacity);
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES);
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity);
}
}

/**
* Resolve a specific nodes/{node.id} path for the specified path and node lock id.
*
Expand Down Expand Up @@ -888,6 +951,17 @@ public NodePath[] nodePaths() {
return nodePaths;
}

/**
* Returns the {@link NodePath} used for file caching.
*/
public NodePath fileCacheNodePath() {
assertEnvIsLocked();
if (nodePaths == null || locks == null) {
throw new IllegalStateException("node is not configured to store local location");
}
return fileCacheNodePath;
}

public int getNodeLockId() {
assertEnvIsLocked();
if (nodePaths == null || locks == null) {
Expand Down Expand Up @@ -1143,6 +1217,22 @@ private void ensureNoShardData(final NodePath[] nodePaths) throws IOException {
}
}

/**
* Throws an exception if cache exists on a non-search node.
*/
private void ensureNoFileCacheData(final NodePath fileCacheNodePath) throws IOException {
List<Path> cacheDataPaths = collectFileCacheDataPath(fileCacheNodePath);
if (cacheDataPaths.isEmpty() == false) {
final String message = String.format(
Locale.ROOT,
"node does not have the %s role but has data within node search cache: %s. Use 'opensearch-node repurpose' tool to clean up",
DiscoveryNodeRole.SEARCH_ROLE.roleName(),
cacheDataPaths
);
throw new IllegalStateException(message);
}
}

private void ensureNoIndexMetadata(final NodePath[] nodePaths) throws IOException {
List<Path> indexMetadataPaths = collectIndexMetadataPaths(nodePaths);
if (indexMetadataPaths.isEmpty() == false) {
Expand Down Expand Up @@ -1200,6 +1290,34 @@ private static boolean isIndexMetadataPath(Path path) {
return Files.isDirectory(path) && path.getFileName().toString().equals(MetadataStateFormat.STATE_DIR_NAME);
}

/**
* Collect the path containing cache data in the indicated cache node path.
* The returned paths will point to the shard data folder.
*/
static List<Path> collectFileCacheDataPath(NodePath fileCacheNodePath) throws IOException {
List<Path> indexSubPaths = new ArrayList<>();
Path fileCachePath = fileCacheNodePath.fileCachePath;
if (Files.isDirectory(fileCachePath)) {
try (DirectoryStream<Path> nodeStream = Files.newDirectoryStream(fileCachePath)) {
for (Path nodePath : nodeStream) {
if (Files.isDirectory(nodePath)) {
try (DirectoryStream<Path> indexStream = Files.newDirectoryStream(nodePath)) {
for (Path indexPath : indexStream) {
if (Files.isDirectory(indexPath)) {
try (Stream<Path> shardStream = Files.list(indexPath)) {
shardStream.map(Path::toAbsolutePath).forEach(indexSubPaths::add);
}
}
}
}
}
}
}
}

return indexSubPaths;
}

/**
* Resolve the custom path for a index's shard.
*/
Expand Down Expand Up @@ -1306,4 +1424,34 @@ private static void tryWriteTempFile(Path path) throws IOException {
}
}
}

/**
* Returns the {@link FileCache} instance for remote search node
*/
public FileCache fileCache() {
return this.fileCache;
}

/**
* Returns the current {@link FileCacheStats} for remote search node
*/
public FileCacheStats fileCacheStats() {
if (fileCache == null) {
return null;
}

CacheStats stats = fileCache.stats();
CacheUsage usage = fileCache.usage();
return new FileCacheStats(
System.currentTimeMillis(),
usage.activeUsage(),
fileCache.capacity(),
usage.usage(),
stats.evictionWeight(),
stats.removeWeight(),
stats.replaceCount(),
stats.hitCount(),
stats.missCount()
);
}
}
Loading