Skip to content

Commit

Permalink
Add cache reservation logic
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal committed Feb 17, 2023
1 parent 1cdff3b commit e24ab81
Show file tree
Hide file tree
Showing 13 changed files with 300 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ public static boolean isRemoteClusterClient(final Settings settings) {
return hasRole(settings, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE);
}

public static boolean isSearchNode(Settings settings) {
return hasRole(settings, DiscoveryNodeRole.SEARCH_ROLE);
}

private final String nodeName;
private final String nodeId;
private final String ephemeralId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ public void apply(Settings value, Settings current, Settings previous) {
Node.NODE_NAME_SETTING,
Node.NODE_ATTRIBUTES,
Node.NODE_LOCAL_STORAGE_SETTING,
Node.NODE_SEARCH_CACHE_SIZE_SETTING,
NodeRoleSettings.NODE_ROLES_SETTING,
AutoCreateIndex.AUTO_CREATE_INDEX_SETTING,
BaseRestHandler.MULTI_ALLOW_EXPLICIT_INDEX,
Expand Down
94 changes: 93 additions & 1 deletion server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.NativeFSLockFactory;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -59,6 +60,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -104,6 +106,7 @@
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableSet;
import static org.opensearch.node.Node.NODE_SEARCH_CACHE_SIZE_SETTING;

/**
* A component that holds all data paths for a single node.
Expand All @@ -123,14 +126,20 @@ public static class NodePath {
public final Path indicesPath;
/** Cached FileStore from path */
public final FileStore fileStore;

public final Path cachePath;
/*
Cache reserved size can default to a different value depending on configuration
*/
public ByteSizeValue cacheReservedSize;
public final int majorDeviceNumber;
public final int minorDeviceNumber;

public NodePath(Path path) throws IOException {
this.path = path;
this.indicesPath = path.resolve(INDICES_FOLDER);
this.cachePath = path.resolve(CACHE_FOLDER);
this.fileStore = Environment.getFileStore(path);
this.cacheReservedSize = ByteSizeValue.ZERO;
if (fileStore.supportsFileAttributeView("lucene")) {
this.majorDeviceNumber = (int) fileStore.getAttribute("lucene:major_device_number");
this.minorDeviceNumber = (int) fileStore.getAttribute("lucene:minor_device_number");
Expand Down Expand Up @@ -180,6 +189,7 @@ public String toString() {

private final Logger logger = LogManager.getLogger(NodeEnvironment.class);
private final NodePath[] nodePaths;
private final NodePath cacheNodePath;
private final Path sharedDataPath;
private final Lock[] locks;

Expand Down Expand Up @@ -215,8 +225,11 @@ public String toString() {
Property.NodeScope
);

private static final ByteSizeValue SEARCH_CACHE_DEFAULT_SIZE = new ByteSizeValue(100, ByteSizeUnit.MB);

public static final String NODES_FOLDER = "nodes";
public static final String INDICES_FOLDER = "indices";
public static final String CACHE_FOLDER = "cache";
public static final String NODE_LOCK_FILENAME = "node.lock";

/**
Expand Down Expand Up @@ -291,6 +304,7 @@ public void close() {
public NodeEnvironment(Settings settings, Environment environment) throws IOException {
if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
nodePaths = null;
cacheNodePath = null;
sharedDataPath = null;
locks = null;
nodeLockId = -1;
Expand Down Expand Up @@ -342,6 +356,25 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
}
this.locks = nodeLock.locks;
this.nodePaths = nodeLock.nodePaths;
this.cacheNodePath = nodePaths[0];

/*
The following block initializes the search cache based on user configuration.
If the user doesn't configure the cache size, we default to 100 MB since we still
need to cache files for index querying.
*/
if (DiscoveryNode.isSearchNode(settings)) {
long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes();

if (capacity == 0) {
FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(this.cacheNodePath));
long availableCapacity = info.getTotal().getBytes();
capacity = Math.min(availableCapacity, SEARCH_CACHE_DEFAULT_SIZE.getBytes());
}

cacheNodePath.cacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES);
}

this.nodeLockId = nodeLock.nodeId;

if (logger.isDebugEnabled()) {
Expand All @@ -366,6 +399,10 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
ensureNoShardData(nodePaths);
}

if (DiscoveryNode.isSearchNode(settings) == false) {
ensureNoCacheData(cacheNodePath);
}

this.nodeMetadata = loadNodeMetadata(settings, logger, nodePaths);
success = true;
} finally {
Expand Down Expand Up @@ -888,6 +925,17 @@ public NodePath[] nodePaths() {
return nodePaths;
}

/**
* Returns the {@link NodePath} used for caching.
*/
public NodePath cacheNodePath() {
assertEnvIsLocked();
if (nodePaths == null || locks == null) {
throw new IllegalStateException("node is not configured to store local location");
}
return cacheNodePath;
}

public int getNodeLockId() {
assertEnvIsLocked();
if (nodePaths == null || locks == null) {
Expand Down Expand Up @@ -1143,6 +1191,22 @@ private void ensureNoShardData(final NodePath[] nodePaths) throws IOException {
}
}

/**
* Throws an exception if cache exists on a non-search node.
*/
private void ensureNoCacheData(final NodePath cacheNodePath) throws IOException {
List<Path> cacheDataPaths = collectCacheDataPath(cacheNodePath);
if (cacheDataPaths.isEmpty() == false) {
final String message = String.format(
Locale.ROOT,
"node does not have the %s role but has shard data: %s. Use 'opensearch-node repurpose' tool to clean up",
DiscoveryNodeRole.SEARCH_ROLE.roleName(),
cacheDataPaths
);
throw new IllegalStateException(message);
}
}

private void ensureNoIndexMetadata(final NodePath[] nodePaths) throws IOException {
List<Path> indexMetadataPaths = collectIndexMetadataPaths(nodePaths);
if (indexMetadataPaths.isEmpty() == false) {
Expand Down Expand Up @@ -1200,6 +1264,34 @@ private static boolean isIndexMetadataPath(Path path) {
return Files.isDirectory(path) && path.getFileName().toString().equals(MetadataStateFormat.STATE_DIR_NAME);
}

/**
* Collect the path containing cache data in the indicated cache node path.
* The returned paths will point to the shard data folder.
*/
static List<Path> collectCacheDataPath(NodePath cacheNodePath) throws IOException {
List<Path> indexSubPaths = new ArrayList<>();
Path cachePath = cacheNodePath.cachePath;
if (Files.isDirectory(cachePath)) {
try (DirectoryStream<Path> nodeStream = Files.newDirectoryStream(cachePath)) {
for (Path nodePath : nodeStream) {
if (Files.isDirectory(nodePath)) {
try (DirectoryStream<Path> indexStream = Files.newDirectoryStream(nodePath)) {
for (Path indexPath : indexStream) {
if (Files.isDirectory(indexPath)) {
try (Stream<Path> shardStream = Files.list(indexPath)) {
shardStream.map(Path::toAbsolutePath).forEach(indexSubPaths::add);
}
}
}
}
}
}
}
}

return indexSubPaths;
}

/**
* Resolve the custom path for a index's shard.
*/
Expand Down
105 changes: 59 additions & 46 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -458,53 +458,9 @@ public synchronized IndexShard createShard(
try {
lock = nodeEnv.shardLock(shardId, "starting shard", TimeUnit.SECONDS.toMillis(5));
eventListener.beforeIndexShardCreated(shardId, indexSettings);
ShardPath path;
try {
path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath());
} catch (IllegalStateException ex) {
logger.warn("{} failed to load shard path, trying to remove leftover", shardId);
try {
ShardPath.deleteLeftoverShardDirectory(logger, nodeEnv, lock, this.indexSettings);
path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath());
} catch (Exception inner) {
ex.addSuppressed(inner);
throw ex;
}
}

if (path == null) {
// TODO: we should, instead, hold a "bytes reserved" of how large we anticipate this shard will be, e.g. for a shard
// that's being relocated/replicated we know how large it will become once it's done copying:
// Count up how many shards are currently on each data path:
Map<Path, Integer> dataPathToShardCount = new HashMap<>();
for (IndexShard shard : this) {
Path dataPath = shard.shardPath().getRootStatePath();
Integer curCount = dataPathToShardCount.get(dataPath);
if (curCount == null) {
curCount = 0;
}
dataPathToShardCount.put(dataPath, curCount + 1);
}
path = ShardPath.selectNewPathForShard(
nodeEnv,
shardId,
this.indexSettings,
routing.getExpectedShardSize() == ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE
? getAvgShardSizeInBytes()
: routing.getExpectedShardSize(),
dataPathToShardCount
);
logger.debug("{} creating using a new path [{}]", shardId, path);
} else {
logger.debug("{} creating using an existing path [{}]", shardId, path);
}

if (shards.containsKey(shardId.id())) {
throw new IllegalStateException(shardId + " already exists");
}

ShardPath path = getShardPath(routing, shardId, lock);
logger.debug("creating shard_id {}", shardId);
// if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary.
// if we are on a shared FS we only own the shard (i.e. we can safely delete it) if we are the primary.
final Engine.Warmer engineWarmer = (reader) -> {
IndexShard shard = getShardOrNull(shardId.getId());
if (shard != null) {
Expand Down Expand Up @@ -573,6 +529,63 @@ public synchronized IndexShard createShard(
}
}

/*
Fetches the shard path based on the index type -
For a remote snapshot index, the cache path is used to initialize the shards.
For a local index, a local shard path is loaded or a new path is calculated.
*/
private ShardPath getShardPath(ShardRouting routing, ShardId shardId, ShardLock lock) throws IOException {
ShardPath path;
if (this.indexSettings.isRemoteSnapshot()) {
path = ShardPath.loadCachePath(nodeEnv, shardId);
} else {
try {
path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath());
} catch (IllegalStateException ex) {
logger.warn("{} failed to load shard path, trying to remove leftover", shardId);
try {
ShardPath.deleteLeftoverShardDirectory(logger, nodeEnv, lock, this.indexSettings);
path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath());
} catch (Exception inner) {
ex.addSuppressed(inner);
throw ex;
}
}

if (path == null) {
// TODO: we should, instead, hold a "bytes reserved" of how large we anticipate this shard will be, e.g. for a shard
// that's being relocated/replicated we know how large it will become once it's done copying:
// Count up how many shards are currently on each data path:
Map<Path, Integer> dataPathToShardCount = new HashMap<>();
for (IndexShard shard : this) {
Path dataPath = shard.shardPath().getRootStatePath();
Integer curCount = dataPathToShardCount.get(dataPath);
if (curCount == null) {
curCount = 0;
}
dataPathToShardCount.put(dataPath, curCount + 1);
}
path = ShardPath.selectNewPathForShard(
nodeEnv,
shardId,
this.indexSettings,
routing.getExpectedShardSize() == ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE
? getAvgShardSizeInBytes()
: routing.getExpectedShardSize(),
dataPathToShardCount
);
logger.debug("{} creating using a new path [{}]", shardId, path);
} else {
logger.debug("{} creating using an existing path [{}]", shardId, path);
}
}

if (shards.containsKey(shardId.id())) {
throw new IllegalStateException(shardId + " already exists");
}
return path;
}

@Override
public synchronized void removeShard(int shardId, String reason) {
final ShardId sId = new ShardId(index(), shardId);
Expand Down
10 changes: 10 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/ShardPath.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ public boolean isCustomDataPath() {
return isCustomDataPath;
}

/**
* Returns the shard path to be stored within the cache on the search capable node.
*/
public static ShardPath loadCachePath(NodeEnvironment env, ShardId shardId) {
NodeEnvironment.NodePath path = env.cacheNodePath();
final Path dataPath = env.resolveCustomLocation(path.cachePath.toString(), shardId);
final Path statePath = path.resolve(shardId);
return new ShardPath(true, dataPath, statePath, shardId);
}

/**
* This method walks through the nodes shard paths to find the data and state path for the given shard. If multiple
* directories with a valid shard state exist the one with the highest version will be used.
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/monitor/fs/FsInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public static class Path implements Writeable, ToXContentObject {
long total = -1;
long free = -1;
long available = -1;
long cacheReserved = -1;

public Path() {}

Expand Down Expand Up @@ -101,6 +102,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(total);
out.writeLong(free);
out.writeLong(available);
out.writeLong(cacheReserved);
}

public String getPath() {
Expand All @@ -127,6 +129,10 @@ public ByteSizeValue getAvailable() {
return new ByteSizeValue(available);
}

public ByteSizeValue getReserved() {
return new ByteSizeValue(cacheReserved);
}

private long addLong(long current, long other) {
if (current == -1 && other == -1) {
return 0;
Expand All @@ -143,6 +149,7 @@ private long addLong(long current, long other) {
public void add(Path path) {
total = FsProbe.adjustForHugeFilesystems(addLong(total, path.total));
free = FsProbe.adjustForHugeFilesystems(addLong(free, path.free));
cacheReserved = FsProbe.adjustForHugeFilesystems(addLong(cacheReserved, path.cacheReserved));
available = FsProbe.adjustForHugeFilesystems(addLong(available, path.available));
}

Expand All @@ -156,6 +163,8 @@ static final class Fields {
static final String FREE_IN_BYTES = "free_in_bytes";
static final String AVAILABLE = "available";
static final String AVAILABLE_IN_BYTES = "available_in_bytes";
static final String CACHE_RESERVED = "cache_reserved";
static final String CACHE_RESERVED_IN_BYTES = "cache_reserved_in_bytes";
}

@Override
Expand All @@ -180,6 +189,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (available != -1) {
builder.humanReadableField(Fields.AVAILABLE_IN_BYTES, Fields.AVAILABLE, getAvailable());
}
if (cacheReserved != -1) {
builder.humanReadableField(Fields.CACHE_RESERVED_IN_BYTES, Fields.CACHE_RESERVED, getReserved());
}

builder.endObject();
return builder;
Expand Down
Loading

0 comments on commit e24ab81

Please sign in to comment.