diff --git a/CHANGELOG.md b/CHANGELOG.md index d0a3a6dea671a..4ea88243a5056 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add fingerprint ingest processor ([#13724](https://github.com/opensearch-project/OpenSearch/pull/13724)) - [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/)) - Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865)) +- [Writable Warm] Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782)) ### Dependencies - Update to Apache Lucene 9.11.0 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042)) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java new file mode 100644 index 0000000000000..08eef8dc8c945 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java @@ -0,0 +1,160 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.admin.indices.get.GetIndexRequest; +import org.opensearch.action.admin.indices.get.GetIndexResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsException; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.IndexModule; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.CompositeDirectory; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.utils.FileTypeUtils; +import org.opensearch.indices.IndicesService; +import org.opensearch.node.Node; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false) +// Uncomment the below line to enable trace level logs for this test for better debugging +// @TestLogging(reason = "Getting trace logs from composite directory package", value = "org.opensearch.index.store:TRACE") +public class WritableWarmIT extends RemoteStoreBaseIntegTestCase { + + protected static final String INDEX_NAME = "test-idx-1"; + protected static final int NUM_DOCS_IN_BULK = 1000; + + /* + Disabling MockFSIndexStore plugin as the MockFSDirectoryFactory wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory) + As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory + */ + @Override + protected boolean addMockIndexStorePlugin() { + return false; + } + + @Override + protected Settings featureFlagSettings() { + Settings.Builder featureSettings = Settings.builder(); + featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true); + return featureSettings.build(); + } + + public void testWritableWarmFeatureFlagDisabled() { + Settings clusterSettings = Settings.builder().put(super.nodeSettings(0)).put(FeatureFlags.TIERED_REMOTE_INDEX, false).build(); + InternalTestCluster internalTestCluster = internalCluster(); + internalTestCluster.startClusterManagerOnlyNode(clusterSettings); + internalTestCluster.startDataOnlyNode(clusterSettings); + + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name()) + .build(); + + try { + prepareCreate(INDEX_NAME).setSettings(indexSettings).get(); + fail("Should have thrown Exception as setting should not be registered if Feature Flag is Disabled"); + } catch (SettingsException | IllegalArgumentException ex) { + assertEquals( + "unknown setting [" + + IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey() + + "] please check that any required plugins are installed, or check the " + + "breaking changes documentation for removed settings", + ex.getMessage() + ); + } + } + + public void testWritableWarmBasic() throws Exception { + InternalTestCluster internalTestCluster = internalCluster(); + internalTestCluster.startClusterManagerOnlyNode(); + internalTestCluster.startDataOnlyNode(); + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name()) + .build(); + assertAcked(client().admin().indices().prepareCreate(INDEX_NAME).setSettings(settings).get()); + + // Verify from the cluster settings if the data locality is partial + GetIndexResponse getIndexResponse = client().admin() + .indices() + .getIndex(new GetIndexRequest().indices(INDEX_NAME).includeDefaults(true)) + .get(); + Settings indexSettings = getIndexResponse.settings().get(INDEX_NAME); + assertEquals(IndexModule.DataLocalityType.PARTIAL.name(), indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey())); + + // Ingesting some docs + indexBulk(INDEX_NAME, NUM_DOCS_IN_BULK); + flushAndRefresh(INDEX_NAME); + + // ensuring cluster is green after performing force-merge + ensureGreen(); + + SearchResponse searchResponse = client().prepareSearch(INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + // Asserting that search returns same number of docs as ingested + assertHitCount(searchResponse, NUM_DOCS_IN_BULK); + + // Ingesting docs again before force merge + indexBulk(INDEX_NAME, NUM_DOCS_IN_BULK); + flushAndRefresh(INDEX_NAME); + + FileCache fileCache = internalTestCluster.getDataNodeInstance(Node.class).fileCache(); + IndexShard shard = internalTestCluster.getDataNodeInstance(IndicesService.class) + .indexService(resolveIndex(INDEX_NAME)) + .getShardOrNull(0); + Directory directory = (((FilterDirectory) (((FilterDirectory) (shard.store().directory())).getDelegate())).getDelegate()); + + // Force merging the index + Set filesBeforeMerge = new HashSet<>(Arrays.asList(directory.listAll())); + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).get(); + flushAndRefresh(INDEX_NAME); + Set filesAfterMerge = new HashSet<>(Arrays.asList(directory.listAll())); + + Set filesFromPreviousGenStillPresent = filesBeforeMerge.stream() + .filter(filesAfterMerge::contains) + .filter(file -> !FileTypeUtils.isLockFile(file)) + .filter(file -> !FileTypeUtils.isSegmentsFile(file)) + .collect(Collectors.toUnmodifiableSet()); + + // Asserting that after merge all the files from previous gen are no more part of the directory + assertTrue(filesFromPreviousGenStillPresent.isEmpty()); + + // Asserting that files from previous gen are not present in File Cache as well + filesBeforeMerge.stream() + .filter(file -> !FileTypeUtils.isLockFile(file)) + .filter(file -> !FileTypeUtils.isSegmentsFile(file)) + .forEach(file -> assertNull(fileCache.get(((CompositeDirectory) directory).getFilePath(file)))); + + // Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file + // leaks + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + fileCache.prune(); + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 6fe8dec9c21b1..1488f5d30b4ba 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -41,6 +41,7 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.logging.Loggers; import org.opensearch.common.settings.Setting.Property; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; import org.opensearch.index.IndexSortConfig; @@ -260,7 +261,10 @@ public final class IndexScopedSettings extends AbstractScopedSettings { * is ready for production release, the feature flag can be removed, and the * setting should be moved to {@link #BUILT_IN_INDEX_SETTINGS}. */ - public static final Map> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of(); + public static final Map> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of( + FeatureFlags.TIERED_REMOTE_INDEX, + List.of(IndexModule.INDEX_STORE_LOCALITY_SETTING) + ); public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS); diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 3c4cb4fd596c1..4c494a6b35153 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -107,6 +107,8 @@ import java.util.function.Function; import java.util.function.Supplier; +import static org.apache.logging.log4j.util.Strings.toRootUpperCase; + /** * IndexModule represents the central extension point for index level custom implementations like: *
    @@ -141,6 +143,17 @@ public final class IndexModule { Property.NodeScope ); + /** + * Index setting which used to determine how the data is cached locally fully or partially + */ + public static final Setting INDEX_STORE_LOCALITY_SETTING = new Setting<>( + "index.store.data_locality", + DataLocalityType.FULL.name(), + DataLocalityType::getValueOf, + Property.IndexScope, + Property.NodeScope + ); + public static final Setting INDEX_RECOVERY_TYPE_SETTING = new Setting<>( "index.recovery.type", "", @@ -297,6 +310,7 @@ public Iterator> settings() { private final AtomicBoolean frozen = new AtomicBoolean(false); private final BooleanSupplier allowExpensiveQueries; private final Map recoveryStateFactories; + private final FileCache fileCache; /** * Construct the index module for the index with the specified index settings. The index module contains extension points for plugins @@ -315,7 +329,8 @@ public IndexModule( final Map directoryFactories, final BooleanSupplier allowExpensiveQueries, final IndexNameExpressionResolver expressionResolver, - final Map recoveryStateFactories + final Map recoveryStateFactories, + final FileCache fileCache ) { this.indexSettings = indexSettings; this.analysisRegistry = analysisRegistry; @@ -327,6 +342,30 @@ public IndexModule( this.allowExpensiveQueries = allowExpensiveQueries; this.expressionResolver = expressionResolver; this.recoveryStateFactories = recoveryStateFactories; + this.fileCache = fileCache; + } + + public IndexModule( + final IndexSettings indexSettings, + final AnalysisRegistry analysisRegistry, + final EngineFactory engineFactory, + final EngineConfigFactory engineConfigFactory, + final Map directoryFactories, + final BooleanSupplier allowExpensiveQueries, + final IndexNameExpressionResolver expressionResolver, + final Map recoveryStateFactories + ) { + this( + indexSettings, + analysisRegistry, + engineFactory, + engineConfigFactory, + directoryFactories, + allowExpensiveQueries, + expressionResolver, + recoveryStateFactories, + null + ); } /** @@ -577,6 +616,40 @@ public boolean match(Settings settings) { } } + /** + * Indicates the locality of the data - whether it will be cached fully or partially + */ + public enum DataLocalityType { + /** + * Indicates that all the data will be cached locally + */ + FULL, + /** + * Indicates that only a subset of the data will be cached locally + */ + PARTIAL; + + private static final Map LOCALITY_TYPES; + + static { + final Map localityTypes = new HashMap<>(values().length); + for (final DataLocalityType dataLocalityType : values()) { + localityTypes.put(dataLocalityType.name(), dataLocalityType); + } + LOCALITY_TYPES = Collections.unmodifiableMap(localityTypes); + } + + public static DataLocalityType getValueOf(final String localityType) { + Objects.requireNonNull(localityType, "No locality type given."); + final String localityTypeName = toRootUpperCase(localityType.trim()); + final DataLocalityType type = LOCALITY_TYPES.get(localityTypeName); + if (type != null) { + return type; + } + throw new IllegalArgumentException("Unknown locality type constant [" + localityType + "]."); + } + } + public static Type defaultStoreType(final boolean allowMmap) { if (allowMmap && Constants.JRE_IS_64BIT && MMapDirectory.UNMAP_SUPPORTED) { return Type.HYBRIDFS; @@ -665,7 +738,8 @@ public IndexService newIndexService( translogFactorySupplier, clusterDefaultRefreshIntervalSupplier, recoverySettings, - remoteStoreSettings + remoteStoreSettings, + fileCache ); success = true; return indexService; diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index e501d7eff3f81..a7849bcf80474 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -55,6 +55,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.AbstractAsyncTask; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.io.IOUtils; @@ -91,8 +92,10 @@ import org.opensearch.index.shard.ShardNotInPrimaryModeException; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.similarity.SimilarityService; +import org.opensearch.index.store.CompositeDirectory; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.Store; +import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogFactory; import org.opensearch.indices.RemoteStoreSettings; @@ -188,6 +191,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final Supplier clusterDefaultRefreshIntervalSupplier; private final RecoverySettings recoverySettings; private final RemoteStoreSettings remoteStoreSettings; + private final FileCache fileCache; public IndexService( IndexSettings indexSettings, @@ -223,7 +227,8 @@ public IndexService( BiFunction translogFactorySupplier, Supplier clusterDefaultRefreshIntervalSupplier, RecoverySettings recoverySettings, - RemoteStoreSettings remoteStoreSettings + RemoteStoreSettings remoteStoreSettings, + FileCache fileCache ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; @@ -301,9 +306,85 @@ public IndexService( this.translogFactorySupplier = translogFactorySupplier; this.recoverySettings = recoverySettings; this.remoteStoreSettings = remoteStoreSettings; + this.fileCache = fileCache; updateFsyncTaskIfNecessary(); } + public IndexService( + IndexSettings indexSettings, + IndexCreationContext indexCreationContext, + NodeEnvironment nodeEnv, + NamedXContentRegistry xContentRegistry, + SimilarityService similarityService, + ShardStoreDeleter shardStoreDeleter, + IndexAnalyzers indexAnalyzers, + EngineFactory engineFactory, + EngineConfigFactory engineConfigFactory, + CircuitBreakerService circuitBreakerService, + BigArrays bigArrays, + ThreadPool threadPool, + ScriptService scriptService, + ClusterService clusterService, + Client client, + QueryCache queryCache, + IndexStorePlugin.DirectoryFactory directoryFactory, + IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, + IndexEventListener eventListener, + Function> wrapperFactory, + MapperRegistry mapperRegistry, + IndicesFieldDataCache indicesFieldDataCache, + List searchOperationListeners, + List indexingOperationListeners, + NamedWriteableRegistry namedWriteableRegistry, + BooleanSupplier idFieldDataEnabled, + BooleanSupplier allowExpensiveQueries, + IndexNameExpressionResolver expressionResolver, + ValuesSourceRegistry valuesSourceRegistry, + IndexStorePlugin.RecoveryStateFactory recoveryStateFactory, + BiFunction translogFactorySupplier, + Supplier clusterDefaultRefreshIntervalSupplier, + RecoverySettings recoverySettings, + RemoteStoreSettings remoteStoreSettings + ) { + this( + indexSettings, + indexCreationContext, + nodeEnv, + xContentRegistry, + similarityService, + shardStoreDeleter, + indexAnalyzers, + engineFactory, + engineConfigFactory, + circuitBreakerService, + bigArrays, + threadPool, + scriptService, + clusterService, + client, + queryCache, + directoryFactory, + remoteDirectoryFactory, + eventListener, + wrapperFactory, + mapperRegistry, + indicesFieldDataCache, + searchOperationListeners, + indexingOperationListeners, + namedWriteableRegistry, + idFieldDataEnabled, + allowExpensiveQueries, + expressionResolver, + valuesSourceRegistry, + recoveryStateFactory, + translogFactorySupplier, + clusterDefaultRefreshIntervalSupplier, + recoverySettings, + remoteStoreSettings, + null + ); + } + static boolean needsMapperService(IndexSettings indexSettings, IndexCreationContext indexCreationContext) { return false == (indexSettings.getIndexMetadata().getState() == IndexMetadata.State.CLOSE && indexCreationContext == IndexCreationContext.CREATE_INDEX); // metadata verification needs a mapper service @@ -495,9 +576,9 @@ public synchronized IndexShard createShard( } }; Store remoteStore = null; + Directory remoteDirectory = null; boolean seedRemote = false; if (targetNode.isRemoteStoreNode()) { - final Directory remoteDirectory; if (this.indexSettings.isRemoteStoreEnabled()) { remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path); } else { @@ -530,7 +611,16 @@ public synchronized IndexShard createShard( } } - Directory directory = directoryFactory.newDirectory(this.indexSettings, path); + Directory directory = null; + if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX_SETTING) && + // TODO : Need to remove this check after support for hot indices is added in Composite Directory + this.indexSettings.isStoreLocalityPartial()) { + Directory localDirectory = directoryFactory.newDirectory(this.indexSettings, path); + directory = new CompositeDirectory(localDirectory, remoteDirectory, fileCache); + } else { + directory = directoryFactory.newDirectory(this.indexSettings, path); + } + store = new Store( shardId, this.indexSettings, diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index de18f07cb3c87..f103cefbe5a9d 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -734,6 +734,7 @@ public static IndexMergePolicy fromString(String text) { private final int numberOfShards; private final ReplicationType replicationType; private final boolean isRemoteStoreEnabled; + private final boolean isStoreLocalityPartial; private volatile TimeValue remoteTranslogUploadBufferInterval; private final String remoteStoreTranslogRepository; private final String remoteStoreRepository; @@ -934,6 +935,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null); replicationType = IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(settings); isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false); + isStoreLocalityPartial = settings.get( + IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), + IndexModule.DataLocalityType.FULL.toString() + ).equalsIgnoreCase(IndexModule.DataLocalityType.PARTIAL.toString()); remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY); remoteTranslogUploadBufferInterval = INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings); remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY); @@ -1289,6 +1294,13 @@ public boolean isRemoteTranslogStoreEnabled() { return isRemoteStoreEnabled; } + /** + * Returns true if the store locality is partial + */ + public boolean isStoreLocalityPartial() { + return isStoreLocalityPartial; + } + /** * Returns true if this is remote/searchable snapshot */ diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 9322c462c0f22..d68798f063782 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -30,6 +30,7 @@ import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.store.CompositeDirectory; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.translog.Translog; @@ -439,6 +440,7 @@ private void uploadNewSegments( logger.debug("Effective new segments files to upload {}", filteredFiles); ActionListener> mappedListener = ActionListener.map(listener, resp -> null); GroupedActionListener batchUploadListener = new GroupedActionListener<>(mappedListener, filteredFiles.size()); + Directory directory = ((FilterDirectory) (((FilterDirectory) storeDirectory).getDelegate())).getDelegate(); for (String src : filteredFiles) { // Initializing listener here to ensure that the stats increment operations are thread-safe @@ -446,6 +448,9 @@ private void uploadNewSegments( ActionListener aggregatedListener = ActionListener.wrap(resp -> { statsListener.onSuccess(src); batchUploadListener.onResponse(resp); + if (directory instanceof CompositeDirectory) { + ((CompositeDirectory) directory).afterSyncToRemote(src); + } }, ex -> { logger.warn(() -> new ParameterizedMessage("Exception: [{}] while uploading segment files", ex), ex); if (ex instanceof CorruptIndexException) { diff --git a/server/src/main/java/org/opensearch/index/store/CloseableFilterIndexOutput.java b/server/src/main/java/org/opensearch/index/store/CloseableFilterIndexOutput.java new file mode 100644 index 0000000000000..291f714369a74 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/CloseableFilterIndexOutput.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.IndexOutput; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.lucene.store.FilterIndexOutput; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * FilterIndexOutput which takes in an additional FunctionalInterface as a parameter to perform required operations once the IndexOutput is closed + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CloseableFilterIndexOutput extends FilterIndexOutput { + + /** + * Functional Interface which takes the name of the file as input on which the required operations are to be performed + */ + @FunctionalInterface + public interface OnCloseListener { + void onClose(String name) throws IOException; + } + + private final OnCloseListener onCloseListener; + private final String fileName; + private final AtomicBoolean isClosed; + + public CloseableFilterIndexOutput(IndexOutput out, String fileName, OnCloseListener onCloseListener) { + super("CloseableFilterIndexOutput for file " + fileName, out); + this.fileName = fileName; + this.onCloseListener = onCloseListener; + this.isClosed = new AtomicBoolean(false); + } + + @Override + public void close() throws IOException { + if (isClosed.get() == false) { + super.close(); + onCloseListener.onClose(fileName); + isClosed.set(true); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java b/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java new file mode 100644 index 0000000000000..eb89c86ae687f --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java @@ -0,0 +1,344 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.Version; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.lucene.store.InputStreamIndexInput; +import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput; +import org.opensearch.index.store.remote.filecache.CachedFullFileIndexInput; +import org.opensearch.index.store.remote.filecache.CachedIndexInput; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.utils.BlockIOContext; +import org.opensearch.index.store.remote.utils.FileTypeUtils; +import org.opensearch.index.store.remote.utils.TransferManager; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Composite Directory will contain both local and remote directory + * Consumers of Composite directory need not worry whether file is in local or remote + * All such abstractions will be handled by the Composite directory itself + * Implements all required methods by Directory abstraction + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CompositeDirectory extends FilterDirectory { + private static final Logger logger = LogManager.getLogger(CompositeDirectory.class); + private final FSDirectory localDirectory; + private final RemoteSegmentStoreDirectory remoteDirectory; + private final FileCache fileCache; + private final TransferManager transferManager; + + /** + * Constructor to initialise the composite directory + * @param localDirectory corresponding to the local FSDirectory + * @param remoteDirectory corresponding to the remote directory + * @param fileCache used to cache the remote files locally + */ + public CompositeDirectory(Directory localDirectory, Directory remoteDirectory, FileCache fileCache) { + super(localDirectory); + validate(localDirectory, remoteDirectory, fileCache); + this.localDirectory = (FSDirectory) localDirectory; + this.remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectory; + this.fileCache = fileCache; + transferManager = new TransferManager( + (name, position, length) -> new InputStreamIndexInput( + remoteDirectory.openInput(name, new BlockIOContext(IOContext.DEFAULT, position, length)), + length + ), + fileCache + ); + } + + /** + * Returns names of all files stored in this directory in sorted order + * Does not include locally stored block files (having _block_ in their names) and files pending deletion + * + * @throws IOException in case of I/O error + */ + @Override + public String[] listAll() throws IOException { + ensureOpen(); + logger.trace("Composite Directory[{}]: listAll() called", this::toString); + String[] localFiles = localDirectory.listAll(); + Set allFiles = new HashSet<>(Arrays.asList(localFiles)); + String[] remoteFiles = getRemoteFiles(); + allFiles.addAll(Arrays.asList(remoteFiles)); + logger.trace("Composite Directory[{}]: Local Directory files - {}", this::toString, () -> Arrays.toString(localFiles)); + logger.trace("Composite Directory[{}]: Remote Directory files - {}", this::toString, () -> Arrays.toString(remoteFiles)); + Set nonBlockLuceneFiles = allFiles.stream() + .filter(file -> !FileTypeUtils.isBlockFile(file)) + .collect(Collectors.toUnmodifiableSet()); + String[] files = new String[nonBlockLuceneFiles.size()]; + nonBlockLuceneFiles.toArray(files); + Arrays.sort(files); + logger.trace("Composite Directory[{}]: listAll() returns : {}", this::toString, () -> Arrays.toString(files)); + return files; + } + + /** + * Removes an existing file in the directory. + * Currently deleting only from local directory as files from remote should not be deleted as that is taken care by garbage collection logic of remote directory + * @param name the name of an existing file. + * @throws IOException in case of I/O error + */ + @Override + public void deleteFile(String name) throws IOException { + ensureOpen(); + logger.trace("Composite Directory[{}]: deleteFile() called {}", this::toString, () -> name); + if (FileTypeUtils.isTempFile(name)) { + localDirectory.deleteFile(name); + } else if (Arrays.asList(listAll()).contains(name) == false) { + throw new NoSuchFileException("File " + name + " not found in directory"); + } else { + fileCache.remove(getFilePath(name)); + } + } + + /** + * Returns the byte length of a file in the directory. + * Throws {@link NoSuchFileException} or {@link FileNotFoundException} in case file is not present locally and in remote as well + * @param name the name of an existing file. + * @throws IOException in case of I/O error + */ + @Override + public long fileLength(String name) throws IOException { + ensureOpen(); + logger.trace("Composite Directory[{}]: fileLength() called {}", this::toString, () -> name); + long fileLength; + Path key = getFilePath(name); + if (FileTypeUtils.isTempFile(name) || fileCache.get(key) != null) { + try { + fileLength = localDirectory.fileLength(name); + logger.trace( + "Composite Directory[{}]: fileLength of {} fetched from Local - {}", + this::toString, + () -> name, + () -> fileLength + ); + } finally { + fileCache.decRef(key); + } + } else { + fileLength = remoteDirectory.fileLength(name); + logger.trace( + "Composite Directory[{}]: fileLength of {} fetched from Remote - {}", + this::toString, + () -> name, + () -> fileLength + ); + } + return fileLength; + } + + /** + * Creates a new, empty file in the directory and returns an {@link IndexOutput} instance for + * appending data to this file. + * @param name the name of the file to create. + * @throws IOException in case of I/O error + */ + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + ensureOpen(); + logger.trace("Composite Directory[{}]: createOutput() called {}", this::toString, () -> name); + // The CloseableFilterIndexOutput will ensure that the file is added to FileCache once write is completed on this file + return new CloseableFilterIndexOutput(localDirectory.createOutput(name, context), name, this::cacheFile); + } + + /** + * Ensures that any writes to these files are moved to stable storage (made durable). + * @throws IOException in case of I/O error + */ + @Override + public void sync(Collection names) throws IOException { + ensureOpen(); + logger.trace("Composite Directory[{}]: sync() called {}", this::toString, () -> names); + Collection remoteFiles = Arrays.asList(getRemoteFiles()); + Collection filesToSync = names.stream().filter(name -> remoteFiles.contains(name) == false).collect(Collectors.toList()); + logger.trace("Composite Directory[{}]: Synced files : {}", this::toString, () -> filesToSync); + localDirectory.sync(filesToSync); + } + + /** + * Renames {@code source} file to {@code dest} file where {@code dest} must not already exist in + * the directory. + * @throws IOException in case of I/O error + */ + @Override + public void rename(String source, String dest) throws IOException { + ensureOpen(); + logger.trace("Composite Directory[{}]: rename() called : source-{}, dest-{}", this::toString, () -> source, () -> dest); + localDirectory.rename(source, dest); + fileCache.remove(getFilePath(source)); + cacheFile(dest); + } + + /** + * Opens a stream for reading an existing file. + * Check whether the file is present locally or in remote and return the IndexInput accordingly + * @param name the name of an existing file. + * @throws IOException in case of I/O error + */ + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + ensureOpen(); + logger.trace("Composite Directory[{}]: openInput() called {}", this::toString, () -> name); + // We aren't tracking temporary files (created via createTempOutput) currently in FileCache as these are created and then deleted + // within a very short span of time + // We will be reading them directory from the local directory + if (FileTypeUtils.isTempFile(name)) { + return localDirectory.openInput(name, context); + } + // Return directly from the FileCache (via TransferManager) if complete file is present + Path key = getFilePath(name); + CachedIndexInput indexInput = fileCache.get(key); + if (indexInput != null) { + logger.trace("Composite Directory[{}]: Complete file {} found in FileCache", this::toString, () -> name); + try { + return indexInput.getIndexInput().clone(); + } finally { + fileCache.decRef(key); + } + } + // If file has been uploaded to the Remote Store, fetch it from the Remote Store in blocks via OnDemandCompositeBlockIndexInput + else { + logger.trace( + "Composite Directory[{}]: Complete file {} not in FileCache, to be fetched in Blocks from Remote", + this::toString, + () -> name + ); + RemoteSegmentStoreDirectory.UploadedSegmentMetadata uploadedSegmentMetadata = remoteDirectory.getSegmentsUploadedToRemoteStore() + .get(name); + if (uploadedSegmentMetadata == null) { + throw new NoSuchFileException("File " + name + " not found in directory"); + } + // TODO : Refactor FileInfo and OnDemandBlockSnapshotIndexInput to more generic names as they are not Remote Snapshot specific + BlobStoreIndexShardSnapshot.FileInfo fileInfo = new BlobStoreIndexShardSnapshot.FileInfo( + name, + new StoreFileMetadata(name, uploadedSegmentMetadata.getLength(), uploadedSegmentMetadata.getChecksum(), Version.LATEST), + null + ); + return new OnDemandBlockSnapshotIndexInput(fileInfo, localDirectory, transferManager); + } + } + + /** + * Closing the local directory here + * @throws IOException in case of I/O error + */ + @Override + public void close() throws IOException { + ensureOpen(); + logger.trace("Composite Directory[{}]: close() called", this::toString); + localDirectory.close(); + } + + @Override + public String toString() { + return "Composite Directory @ " + Integer.toHexString(hashCode()); + } + + /** + * Function to perform operations once files have been uploaded to Remote Store + * Currently deleting the local files here, as once uploaded to Remote, local files become eligible for eviction from FileCache + * @param file : recent files which have been successfully uploaded to Remote Store + */ + public void afterSyncToRemote(String file) { + ensureOpen(); + /* + Decrementing the refCount here for the path so that it becomes eligible for eviction + This is a temporary solution until pinning support is added + TODO - Unpin the files here from FileCache so that they become eligible for eviction, once pinning/unpinning support is added in FileCache + Uncomment the below commented line(to remove the file from cache once uploaded) to test block based functionality + */ + logger.trace( + "Composite Directory[{}]: File {} uploaded to Remote Store and now can be eligible for eviction in FileCache", + this::toString, + () -> file + ); + fileCache.decRef(getFilePath(file)); + // fileCache.remove(getFilePath(fileName)); + } + + // Visibility public since we need it in IT tests + public Path getFilePath(String name) { + return localDirectory.getDirectory().resolve(name); + } + + /** + * Basic validations for Composite Directory parameters (null checks and instance type checks) + * + * Note: Currently Composite Directory only supports local directory to be of type FSDirectory + * The reason is that FileCache currently has it key type as Path + * Composite Directory currently uses FSDirectory's getDirectory() method to fetch and use the Path for operating on FileCache + * TODO : Refactor FileCache to have key in form of String instead of Path. Once that is done we can remove this assertion + */ + private void validate(Directory localDirectory, Directory remoteDirectory, FileCache fileCache) { + if (localDirectory == null || remoteDirectory == null) throw new IllegalStateException( + "Local and remote directory cannot be null for Composite Directory" + ); + if (fileCache == null) throw new IllegalStateException( + "File Cache not initialized on this Node, cannot create Composite Directory without FileCache" + ); + if (localDirectory instanceof FSDirectory == false) throw new IllegalStateException( + "For Composite Directory, local directory must be of type FSDirectory" + ); + if (remoteDirectory instanceof RemoteSegmentStoreDirectory == false) throw new IllegalStateException( + "For Composite Directory, remote directory must be of type RemoteSegmentStoreDirectory" + ); + } + + /** + * Return the list of files present in Remote + */ + private String[] getRemoteFiles() throws IOException { + String[] remoteFiles; + try { + remoteFiles = remoteDirectory.listAll(); + } catch (NullPointerException e) { + /* + We can encounter NPE when no data has been uploaded to remote store yet and as a result the metadata is empty + Empty metadata means that there are no files currently in remote, hence returning an empty list in this scenario + TODO : Catch the NPE in listAll of RemoteSegmentStoreDirectory itself instead of catching here + */ + remoteFiles = new String[0]; + } + return remoteFiles; + } + + private void cacheFile(String name) throws IOException { + Path filePath = getFilePath(name); + // put will increase the refCount for the path, making sure it is not evicted, will decrease the ref after it is uploaded to Remote + // so that it can be evicted after that + // this is just a temporary solution, will pin the file once support for that is added in FileCache + // TODO : Pin the above filePath in the file cache once pinning support is added so that it cannot be evicted unless it has been + // successfully uploaded to Remote + fileCache.put(filePath, new CachedFullFileIndexInput(fileCache, filePath, localDirectory.openInput(name, IOContext.DEFAULT))); + } + +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 99f78130ad3ef..c61fae74c0bc0 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -28,9 +28,11 @@ import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.index.store.exception.ChecksumCombinationException; +import org.opensearch.index.store.remote.utils.BlockIOContext; import java.io.FileNotFoundException; import java.io.IOException; @@ -203,10 +205,14 @@ public IndexInput openInput(String name, IOContext context) throws IOException { public IndexInput openInput(String name, long fileLength, IOContext context) throws IOException { InputStream inputStream = null; try { - inputStream = blobContainer.readBlob(name); - return new RemoteIndexInput(name, downloadRateLimiter.apply(inputStream), fileLength); + if (context instanceof BlockIOContext) { + return getBlockInput(name, fileLength, (BlockIOContext) context); + } else { + inputStream = blobContainer.readBlob(name); + return new RemoteIndexInput(name, downloadRateLimiter.apply(inputStream), fileLength); + } } catch (Exception e) { - // Incase the RemoteIndexInput creation fails, close the input stream to avoid file handler leak. + // In case the RemoteIndexInput creation fails, close the input stream to avoid file handler leak. if (inputStream != null) { try { inputStream.close(); @@ -434,4 +440,18 @@ private long calculateChecksumOfChecksum(Directory directory, String file) throw } } } + + private IndexInput getBlockInput(String name, long fileLength, BlockIOContext blockIOContext) throws IOException { + long position = blockIOContext.getBlockStart(); + long length = blockIOContext.getBlockSize(); + if (position < 0 || length < 0 || (position + length > fileLength)) { + throw new IllegalArgumentException("Invalid values of block start and size"); + } + byte[] bytes; + try (InputStream inputStream = blobContainer.readBlob(name, position, length)) { + // TODO - Explore how we can buffer small chunks of data instead of having the whole 8MB block in memory + bytes = downloadRateLimiter.apply(inputStream).readAllBytes(); + } + return new ByteArrayIndexInput(name, bytes); + } } diff --git a/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java index 7cfa738e75e52..177f0526e7571 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java @@ -94,7 +94,7 @@ private Future createRemoteSnapshotDirectoryFromSnapsho assert indexShardSnapshot instanceof BlobStoreIndexShardSnapshot : "indexShardSnapshot should be an instance of BlobStoreIndexShardSnapshot"; final BlobStoreIndexShardSnapshot snapshot = (BlobStoreIndexShardSnapshot) indexShardSnapshot; - TransferManager transferManager = new TransferManager(blobContainer, remoteStoreFileCache); + TransferManager transferManager = new TransferManager(blobContainer::readBlob, remoteStoreFileCache); return new RemoteSnapshotDirectory(snapshot, localStoreDir, transferManager); }); } diff --git a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java index 8097fd08da50a..ad56127394779 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java @@ -8,6 +8,8 @@ package org.opensearch.index.store.remote.file; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.IndexInput; import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; @@ -26,6 +28,7 @@ * @opensearch.internal */ public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput { + private static final Logger logger = LogManager.getLogger(OnDemandBlockSnapshotIndexInput.class); /** * Where this class fetches IndexInput parts from */ @@ -133,10 +136,19 @@ protected OnDemandBlockSnapshotIndexInput buildSlice(String sliceDescription, lo @Override protected IndexInput fetchBlock(int blockId) throws IOException { - final String blockFileName = fileName + "." + blockId; + logger.trace("fetchBlock called with blockId -> {}", blockId); + final String blockFileName = fileName + "_block_" + blockId; final long blockStart = getBlockStart(blockId); final long blockEnd = blockStart + getActualBlockSize(blockId); + logger.trace( + "File: {} , Block File: {} , BlockStart: {} , BlockEnd: {} , OriginalFileSize: {}", + fileName, + blockFileName, + blockStart, + blockEnd, + originalFileSize + ); // Block may be present on multiple chunks of a file, so we need // to fetch each chunk/blob part separately to fetch an entire block. diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/CachedFullFileIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/CachedFullFileIndexInput.java new file mode 100644 index 0000000000000..286739cb6cd90 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/CachedFullFileIndexInput.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.filecache; + +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.IndexInput; +import org.opensearch.common.annotation.ExperimentalApi; + +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Implementation of the CachedIndexInput for full files which takes in an IndexInput as parameter + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CachedFullFileIndexInput implements CachedIndexInput { + private final FileCache fileCache; + private final Path path; + private final FullFileCachedIndexInput fullFileCachedIndexInput; + private final AtomicBoolean isClosed; + + /** + * Constructor - takes IndexInput as parameter + */ + public CachedFullFileIndexInput(FileCache fileCache, Path path, IndexInput indexInput) { + this.fileCache = fileCache; + this.path = path; + fullFileCachedIndexInput = new FullFileCachedIndexInput(fileCache, path, indexInput); + isClosed = new AtomicBoolean(false); + } + + /** + * Returns the wrapped indexInput + */ + @Override + public IndexInput getIndexInput() { + if (isClosed.get()) throw new AlreadyClosedException("Index input is already closed"); + return fullFileCachedIndexInput; + } + + /** + * Returns the length of the wrapped indexInput + */ + @Override + public long length() { + return fullFileCachedIndexInput.length(); + } + + /** + * Checks if the wrapped indexInput is closed + */ + @Override + public boolean isClosed() { + return isClosed.get(); + } + + /** + * Closes the wrapped indexInput + */ + @Override + public void close() throws Exception { + if (!isClosed.getAndSet(true)) { + fullFileCachedIndexInput.close(); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java index c14c551eb22da..2491e525993a0 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java @@ -8,6 +8,8 @@ package org.opensearch.index.store.remote.filecache; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.store.IndexInput; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.settings.Setting; @@ -48,9 +50,9 @@ */ @PublicApi(since = "2.7.0") public class FileCache implements RefCountedCache { + private static final Logger logger = LogManager.getLogger(FileCache.class); // This constant moved, but exists here for backward compatibility public static final Setting DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING; - private final SegmentedCache theCache; private final CircuitBreaker circuitBreaker; @@ -143,6 +145,14 @@ public CacheStats stats() { return theCache.stats(); } + // To be used only for debugging purposes + public void logCurrentState() { + logger.trace("CURRENT STATE OF FILE CACHE \n"); + CacheUsage cacheUsage = theCache.usage(); + logger.trace("Total Usage: " + cacheUsage.usage() + " , Active Usage: " + cacheUsage.activeUsage()); + theCache.logCurrentState(); + } + /** * Ensures that the PARENT breaker is not tripped when an entry is added to the cache * @param filePath the path key for which entry is added diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInput.java index 7d7c40be3a833..ab6f5f931da0f 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInput.java @@ -39,9 +39,9 @@ public class FileCachedIndexInput extends IndexInput implements RandomAccessInpu protected IndexInput luceneIndexInput; /** indicates if this IndexInput instance is a clone or not */ - private final boolean isClone; + protected final boolean isClone; - private volatile boolean closed = false; + protected volatile boolean closed = false; public FileCachedIndexInput(FileCache cache, Path filePath, IndexInput underlyingIndexInput) { this(cache, filePath, underlyingIndexInput, false); diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInput.java new file mode 100644 index 0000000000000..9383c53d6d830 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInput.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.filecache; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.IndexInput; +import org.opensearch.common.annotation.ExperimentalApi; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.HashSet; +import java.util.Set; + +/** + * Extension of {@link FileCachedIndexInput} for full files for handling clones and slices + * We maintain a clone map so that we can close them when the parent IndexInput is closed so that ref count is properly maintained in file cache + * Closing of clones explicitly is needed as Lucene does not guarantee that it will close the clones + * https://github.com/apache/lucene/blob/8340b01c3cc229f33584ce2178b07b8984daa6a9/lucene/core/src/java/org/apache/lucene/store/IndexInput.java#L32-L33 + * @opensearch.experimental + */ +@ExperimentalApi +public class FullFileCachedIndexInput extends FileCachedIndexInput { + private static final Logger logger = LogManager.getLogger(FullFileCachedIndexInput.class); + private final Set clones; + + public FullFileCachedIndexInput(FileCache cache, Path filePath, IndexInput underlyingIndexInput) { + this(cache, filePath, underlyingIndexInput, false); + } + + public FullFileCachedIndexInput(FileCache cache, Path filePath, IndexInput underlyingIndexInput, boolean isClone) { + super(cache, filePath, underlyingIndexInput, isClone); + clones = new HashSet<>(); + } + + /** + * Clones the index input and returns the clone + * Increase the ref count whenever the index input is cloned and add it to the clone map as well + */ + @Override + public FullFileCachedIndexInput clone() { + FullFileCachedIndexInput clonedIndexInput = new FullFileCachedIndexInput(cache, filePath, luceneIndexInput.clone(), true); + cache.incRef(filePath); + clones.add(clonedIndexInput); + return clonedIndexInput; + } + + /** + * Clones the index input and returns the slice + * Increase the ref count whenever the index input is sliced and add it to the clone map as well + */ + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + if (offset < 0 || length < 0 || offset + length > this.length()) { + throw new IllegalArgumentException( + "slice() " + + sliceDescription + + " out of bounds: offset=" + + offset + + ",length=" + + length + + ",fileLength=" + + this.length() + + ": " + + this + ); + } + IndexInput slicedLuceneIndexInput = luceneIndexInput.slice(sliceDescription, offset, length); + FullFileCachedIndexInput slicedIndexInput = new FullFileCachedIndexInput(cache, filePath, slicedLuceneIndexInput, true); + clones.add(slicedIndexInput); + cache.incRef(filePath); + return slicedIndexInput; + } + + /** + * Closes the index input and it's clones as well + */ + @Override + public void close() throws IOException { + if (!closed) { + if (isClone) { + cache.decRef(filePath); + } + clones.forEach(indexInput -> { + try { + indexInput.close(); + } catch (Exception e) { + logger.trace("Exception while closing clone - {}", e.getMessage()); + } + }); + try { + luceneIndexInput.close(); + } catch (AlreadyClosedException e) { + logger.trace("FullFileCachedIndexInput already closed"); + } + luceneIndexInput = null; + clones.clear(); + closed = true; + } + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/BlockIOContext.java b/server/src/main/java/org/opensearch/index/store/remote/utils/BlockIOContext.java new file mode 100644 index 0000000000000..a78dd85d6f194 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/BlockIOContext.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.utils; + +import org.apache.lucene.store.IOContext; +import org.opensearch.common.annotation.ExperimentalApi; + +/** + * BlockIOContext is an extension of IOContext which can be used to pass block related information to the openInput() method of any directory + * + * @opensearch.experimental + */ +@ExperimentalApi +public class BlockIOContext extends IOContext { + + private long blockStart; + private long blockSize; + + /** + * Constructor to initialise BlockIOContext with block related information + */ + public BlockIOContext(IOContext ctx, long blockStart, long blockSize) { + super(ctx.context); + verifyBlockStartAndSize(blockStart, blockSize); + this.blockStart = blockStart; + this.blockSize = blockSize; + } + + /** + * Getter for blockStart + */ + public long getBlockStart() { + return blockStart; + } + + /** + * Getter for blockSize + */ + public long getBlockSize() { + return blockSize; + } + + private void verifyBlockStartAndSize(long blockStart, long blockSize) { + if (blockStart < 0) throw new IllegalArgumentException("blockStart must be greater than or equal to 0"); + if (blockSize <= 0) throw new IllegalArgumentException(("blockSize must be greater than 0")); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/FileTypeUtils.java b/server/src/main/java/org/opensearch/index/store/remote/utils/FileTypeUtils.java new file mode 100644 index 0000000000000..e78480bd500ee --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/FileTypeUtils.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.utils; + +import org.opensearch.common.annotation.ExperimentalApi; + +/** + * Utility class for checking file types + * + * @opensearch.experimental + */ +@ExperimentalApi +public class FileTypeUtils { + + public static boolean isTempFile(String name) { + return name.endsWith(".tmp"); + } + + public static boolean isBlockFile(String name) { + return name.contains("_block_"); + } + + public static boolean isExtraFSFile(String name) { + return name.startsWith("extra"); + } + + public static boolean isLockFile(String name) { + return name.endsWith(".lock"); + } + + public static boolean isSegmentsFile(String name) { + return name.startsWith("segments_"); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index 98cad7bfadb09..df26f2f0925f6 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; -import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.index.store.remote.filecache.CachedIndexInput; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCachedIndexInput; @@ -39,11 +38,19 @@ public class TransferManager { private static final Logger logger = LogManager.getLogger(TransferManager.class); - private final BlobContainer blobContainer; + /** + * Functional interface to get an InputStream for a file at a certain offset and size + */ + @FunctionalInterface + public interface StreamReader { + InputStream read(String name, long position, long length) throws IOException; + } + + private final StreamReader streamReader; private final FileCache fileCache; - public TransferManager(final BlobContainer blobContainer, final FileCache fileCache) { - this.blobContainer = blobContainer; + public TransferManager(final StreamReader streamReader, final FileCache fileCache) { + this.streamReader = streamReader; this.fileCache = fileCache; } @@ -55,12 +62,15 @@ public TransferManager(final BlobContainer blobContainer, final FileCache fileCa public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException { final Path key = blobFetchRequest.getFilePath(); + logger.trace("fetchBlob called for {}", key.toString()); final CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> { if (cachedIndexInput == null || cachedIndexInput.isClosed()) { + logger.trace("Transfer Manager - IndexInput closed or not in cache"); // Doesn't exist or is closed, either way create a new one - return new DelayedCreationCachedIndexInput(fileCache, blobContainer, blobFetchRequest); + return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest); } else { + logger.trace("Transfer Manager - Already in cache"); // already in the cache and ready to be used (open) return cachedIndexInput; } @@ -77,7 +87,7 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio } @SuppressWarnings("removal") - private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobContainer blobContainer, BlobFetchRequest request) { + private static FileCachedIndexInput createIndexInput(FileCache fileCache, StreamReader streamReader, BlobFetchRequest request) { // We need to do a privileged action here in order to fetch from remote // and write to the local file cache in case this is invoked as a side // effect of a plugin (such as a scripted search) that doesn't have the @@ -85,13 +95,14 @@ private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobCo return AccessController.doPrivileged((PrivilegedAction) () -> { try { if (Files.exists(request.getFilePath()) == false) { + logger.trace("Fetching from Remote in createIndexInput of Transfer Manager"); try ( OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath()); OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream) ) { for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) { try ( - InputStream snapshotFileInputStream = blobContainer.readBlob( + InputStream snapshotFileInputStream = streamReader.read( blobPart.getBlobName(), blobPart.getPosition(), blobPart.getLength() @@ -119,15 +130,15 @@ private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobCo */ private static class DelayedCreationCachedIndexInput implements CachedIndexInput { private final FileCache fileCache; - private final BlobContainer blobContainer; + private final StreamReader streamReader; private final BlobFetchRequest request; private final CompletableFuture result = new CompletableFuture<>(); private final AtomicBoolean isStarted = new AtomicBoolean(false); private final AtomicBoolean isClosed = new AtomicBoolean(false); - private DelayedCreationCachedIndexInput(FileCache fileCache, BlobContainer blobContainer, BlobFetchRequest request) { + private DelayedCreationCachedIndexInput(FileCache fileCache, StreamReader streamReader, BlobFetchRequest request) { this.fileCache = fileCache; - this.blobContainer = blobContainer; + this.streamReader = streamReader; this.request = request; } @@ -139,7 +150,7 @@ public IndexInput getIndexInput() throws IOException { if (isStarted.getAndSet(true) == false) { // We're the first one here, need to download the block try { - result.complete(createIndexInput(fileCache, blobContainer, request)); + result.complete(createIndexInput(fileCache, streamReader, request)); } catch (Exception e) { result.completeExceptionally(e); fileCache.remove(request.getFilePath()); diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java index 03d03711f914a..7f7d42e8fbce8 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java @@ -8,6 +8,8 @@ package org.opensearch.index.store.remote.utils.cache; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.RemovalReason; @@ -19,6 +21,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.Map; import java.util.Objects; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; @@ -43,6 +46,7 @@ * @opensearch.internal */ class LRUCache implements RefCountedCache { + private static final Logger logger = LogManager.getLogger(LRUCache.class); private final long capacity; private final HashMap> data; @@ -192,8 +196,10 @@ public void clear() { usage = 0L; activeUsage = 0L; lru.clear(); - for (Node node : data.values()) { - data.remove(node.key); + final Iterator> iterator = data.values().iterator(); + while (iterator.hasNext()) { + Node node = iterator.next(); + iterator.remove(); statsCounter.recordRemoval(node.weight); listener.onRemoval(new RemovalNotification<>(node.key, node.value, RemovalReason.EXPLICIT)); } @@ -300,6 +306,22 @@ public CacheStats stats() { } } + // To be used only for debugging purposes + public void logCurrentState() { + lock.lock(); + try { + String allFiles = "\n"; + for (Map.Entry> entry : data.entrySet()) { + String path = entry.getKey().toString(); + String file = path.substring(path.lastIndexOf('/')); + allFiles += file + " [RefCount: " + entry.getValue().refCount + " , Weight: " + entry.getValue().weight + " ]\n"; + } + logger.trace("Cache entries : " + allFiles); + } finally { + lock.unlock(); + } + } + private void addNode(K key, V value) { final long weight = weigher.weightOf(value); Node newNode = new Node<>(key, value, weight); diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java index d3eb03df37e1b..2ea7ea8dbee12 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java @@ -8,6 +8,8 @@ package org.opensearch.index.store.remote.utils.cache; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.Weigher; @@ -25,6 +27,7 @@ * @opensearch.internal */ public class SegmentedCache implements RefCountedCache { + private static final Logger logger = LogManager.getLogger(SegmentedCache.class); private static final int HASH_BITS = 0x7fffffff; @@ -183,6 +186,16 @@ public CacheStats stats() { return new CacheStats(hitCount, missCount, removeCount, removeWeight, replaceCount, evictionCount, evictionWeight); } + // To be used only for debugging purposes + public void logCurrentState() { + int i = 0; + for (RefCountedCache cache : table) { + logger.trace("SegmentedCache " + i); + ((LRUCache) cache).logCurrentState(); + i++; + } + } + enum SingletonWeigher implements Weigher { INSTANCE; diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 8c02120225897..7fbb5062692ba 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -138,6 +138,7 @@ import org.opensearch.index.shard.IndexingOperationListener; import org.opensearch.index.shard.IndexingStats; import org.opensearch.index.shard.IndexingStats.Stats.DocStatusStats; +import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; import org.opensearch.index.translog.TranslogFactory; @@ -356,6 +357,7 @@ public class IndicesService extends AbstractLifecycleComponent private final BiFunction translogFactorySupplier; private volatile TimeValue clusterDefaultRefreshInterval; private final SearchRequestStats searchRequestStats; + private final FileCache fileCache; @Override protected void doStart() { @@ -390,7 +392,8 @@ public IndicesService( @Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, RecoverySettings recoverySettings, CacheService cacheService, - RemoteStoreSettings remoteStoreSettings + RemoteStoreSettings remoteStoreSettings, + FileCache fileCache ) { this.settings = settings; this.threadPool = threadPool; @@ -497,6 +500,68 @@ protected void closeInternal() { .addSettingsUpdateConsumer(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING, this::onRefreshIntervalUpdate); this.recoverySettings = recoverySettings; this.remoteStoreSettings = remoteStoreSettings; + this.fileCache = fileCache; + } + + public IndicesService( + Settings settings, + PluginsService pluginsService, + NodeEnvironment nodeEnv, + NamedXContentRegistry xContentRegistry, + AnalysisRegistry analysisRegistry, + IndexNameExpressionResolver indexNameExpressionResolver, + MapperRegistry mapperRegistry, + NamedWriteableRegistry namedWriteableRegistry, + ThreadPool threadPool, + IndexScopedSettings indexScopedSettings, + CircuitBreakerService circuitBreakerService, + BigArrays bigArrays, + ScriptService scriptService, + ClusterService clusterService, + Client client, + MetaStateService metaStateService, + Collection>> engineFactoryProviders, + Map directoryFactories, + ValuesSourceRegistry valuesSourceRegistry, + Map recoveryStateFactories, + IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, + Supplier repositoriesServiceSupplier, + SearchRequestStats searchRequestStats, + @Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, + RecoverySettings recoverySettings, + CacheService cacheService, + RemoteStoreSettings remoteStoreSettings + ) { + this( + settings, + pluginsService, + nodeEnv, + xContentRegistry, + analysisRegistry, + indexNameExpressionResolver, + mapperRegistry, + namedWriteableRegistry, + threadPool, + indexScopedSettings, + circuitBreakerService, + bigArrays, + scriptService, + clusterService, + client, + metaStateService, + engineFactoryProviders, + directoryFactories, + valuesSourceRegistry, + recoveryStateFactories, + remoteDirectoryFactory, + repositoriesServiceSupplier, + searchRequestStats, + remoteStoreStatsTrackerFactory, + recoverySettings, + cacheService, + remoteStoreSettings, + null + ); } /** @@ -876,7 +941,8 @@ private synchronized IndexService createIndexService( directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver, - recoveryStateFactories + recoveryStateFactories, + fileCache ); for (IndexingOperationListener operationListener : indexingOperationListeners) { indexModule.addIndexOperationListener(operationListener); @@ -966,7 +1032,8 @@ public synchronized MapperService createIndexMapperService(IndexMetadata indexMe directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver, - recoveryStateFactories + recoveryStateFactories, + fileCache ); pluginsService.onIndexModule(indexModule); return indexModule.newIndexMapperService(xContentRegistry, mapperRegistry, scriptService); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index a05d4e10aa1b9..62011b1986dd6 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -868,7 +868,8 @@ protected Node( remoteStoreStatsTrackerFactory, recoverySettings, cacheService, - remoteStoreSettings + remoteStoreSettings, + fileCache ); final IngestService ingestService = new IngestService( @@ -1999,7 +2000,8 @@ DiscoveryNode getNode() { * Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined. */ private void initializeFileCache(Settings settings, CircuitBreaker circuitBreaker) throws IOException { - if (DiscoveryNode.isSearchNode(settings)) { + boolean isWritableRemoteIndexEnabled = FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX_SETTING); + if (DiscoveryNode.isSearchNode(settings) || isWritableRemoteIndexEnabled) { NodeEnvironment.NodePath fileCacheNodePath = nodeEnvironment.fileCacheNodePath(); long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes(); FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(fileCacheNodePath)); @@ -2008,7 +2010,10 @@ private void initializeFileCache(Settings settings, CircuitBreaker circuitBreake // Initialize default values for cache if NODE_SEARCH_CACHE_SIZE_SETTING is not set. if (capacity == 0) { // If node is not a dedicated search node without configuration, prevent cache initialization - if (DiscoveryNode.getRolesFromSettings(settings).stream().anyMatch(role -> !DiscoveryNodeRole.SEARCH_ROLE.equals(role))) { + if (!isWritableRemoteIndexEnabled + && DiscoveryNode.getRolesFromSettings(settings) + .stream() + .anyMatch(role -> !DiscoveryNodeRole.SEARCH_ROLE.equals(role))) { throw new SettingsException( "Unable to initialize the " + DiscoveryNodeRole.SEARCH_ROLE.roleName() diff --git a/server/src/test/java/org/opensearch/index/store/BaseRemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/BaseRemoteSegmentStoreDirectoryTests.java new file mode 100644 index 0000000000000..ff9b62a341deb --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/BaseRemoteSegmentStoreDirectoryTests.java @@ -0,0 +1,178 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.index.SegmentInfos; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +import static org.opensearch.index.store.RemoteSegmentStoreDirectory.METADATA_FILES_TO_FETCH; +import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes; +import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class BaseRemoteSegmentStoreDirectoryTests extends IndexShardTestCase { + + protected RemoteDirectory remoteDataDirectory; + protected RemoteDirectory remoteMetadataDirectory; + protected RemoteStoreMetadataLockManager mdLockManager; + protected RemoteSegmentStoreDirectory remoteSegmentStoreDirectory; + protected TestUploadListener testUploadTracker; + protected IndexShard indexShard; + protected SegmentInfos segmentInfos; + protected ThreadPool threadPool; + + protected final String metadataFilename = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( + 12, + 23, + 34, + 1, + 1, + "node-1" + ); + + protected final String metadataFilenameDup = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( + 12, + 23, + 34, + 2, + 1, + "node-2" + ); + protected final String metadataFilename2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( + 12, + 13, + 34, + 1, + 1, + "node-1" + ); + protected final String metadataFilename3 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( + 10, + 38, + 34, + 1, + 1, + "node-1" + ); + protected final String metadataFilename4 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( + 10, + 36, + 34, + 1, + 1, + "node-1" + ); + + public void setupRemoteSegmentStoreDirectory() throws IOException { + remoteDataDirectory = mock(RemoteDirectory.class); + remoteMetadataDirectory = mock(RemoteDirectory.class); + mdLockManager = mock(RemoteStoreMetadataLockManager.class); + threadPool = mock(ThreadPool.class); + testUploadTracker = new TestUploadListener(); + + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); + + indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory()); + remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( + remoteDataDirectory, + remoteMetadataDirectory, + mdLockManager, + threadPool, + indexShard.shardId() + ); + try (Store store = indexShard.store()) { + segmentInfos = store.readLastCommittedSegmentsInfo(); + } + + when(threadPool.executor(ThreadPool.Names.REMOTE_PURGE)).thenReturn(executorService); + when(threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY)).thenReturn(executorService); + when(threadPool.executor(ThreadPool.Names.SAME)).thenReturn(executorService); + } + + protected Map> populateMetadata() throws IOException { + List metadataFiles = new ArrayList<>(); + + metadataFiles.add(metadataFilename); + metadataFiles.add(metadataFilename2); + metadataFiles.add(metadataFilename3); + + when( + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, + METADATA_FILES_TO_FETCH + ) + ).thenReturn(List.of(metadataFilename)); + when( + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, + Integer.MAX_VALUE + ) + ).thenReturn(metadataFiles); + + Map> metadataFilenameContentMapping = Map.of( + metadataFilename, + getDummyMetadata("_0", 1), + metadataFilename2, + getDummyMetadata("_0", 1), + metadataFilename3, + getDummyMetadata("_0", 1) + ); + + when(remoteMetadataDirectory.getBlobStream(metadataFilename)).thenAnswer( + I -> createMetadataFileBytes( + metadataFilenameContentMapping.get(metadataFilename), + indexShard.getLatestReplicationCheckpoint(), + segmentInfos + ) + ); + when(remoteMetadataDirectory.getBlobStream(metadataFilename2)).thenAnswer( + I -> createMetadataFileBytes( + metadataFilenameContentMapping.get(metadataFilename2), + indexShard.getLatestReplicationCheckpoint(), + segmentInfos + ) + ); + when(remoteMetadataDirectory.getBlobStream(metadataFilename3)).thenAnswer( + I -> createMetadataFileBytes( + metadataFilenameContentMapping.get(metadataFilename3), + indexShard.getLatestReplicationCheckpoint(), + segmentInfos + ) + ); + + return metadataFilenameContentMapping; + } + + @After + public void tearDown() throws Exception { + indexShard.close("test tearDown", true, false); + super.tearDown(); + } + +} diff --git a/server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java new file mode 100644 index 0000000000000..d5628cfab9ee7 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java @@ -0,0 +1,202 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.common.breaker.NoopCircuitBreaker; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; +import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.filecache.FileCacheFactory; +import org.opensearch.index.store.remote.filecache.FileCachedIndexInput; +import org.opensearch.index.store.remote.utils.FileTypeUtils; +import org.junit.Before; + +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; + +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +public class CompositeDirectoryTests extends BaseRemoteSegmentStoreDirectoryTests { + private FileCache fileCache; + private FSDirectory localDirectory; + private CompositeDirectory compositeDirectory; + + private final static String[] LOCAL_FILES = new String[] { "_1.cfe", "_2.cfe", "_0.cfe_block_7", "_0.cfs_block_7", "temp_file.tmp" }; + private final static String FILE_PRESENT_LOCALLY = "_1.cfe"; + private final static String FILE_PRESENT_IN_REMOTE_ONLY = "_0.si"; + private final static String NON_EXISTENT_FILE = "non_existent_file"; + private final static String NEW_FILE = "new_file"; + private final static String TEMP_FILE = "temp_file.tmp"; + private final static int FILE_CACHE_CAPACITY = 10000; + + @Before + public void setup() throws IOException { + setupRemoteSegmentStoreDirectory(); + populateMetadata(); + remoteSegmentStoreDirectory.init(); + localDirectory = FSDirectory.open(createTempDir()); + removeExtraFSFiles(); + fileCache = FileCacheFactory.createConcurrentLRUFileCache(FILE_CACHE_CAPACITY, new NoopCircuitBreaker(CircuitBreaker.REQUEST)); + compositeDirectory = new CompositeDirectory(localDirectory, remoteSegmentStoreDirectory, fileCache); + addFilesToDirectory(LOCAL_FILES); + } + + public void testListAll() throws IOException { + String[] actualFileNames = compositeDirectory.listAll(); + String[] expectedFileNames = new String[] { "_0.cfe", "_0.cfs", "_0.si", "_1.cfe", "_2.cfe", "segments_1", "temp_file.tmp" }; + assertArrayEquals(expectedFileNames, actualFileNames); + } + + public void testDeleteFile() throws IOException { + assertTrue(existsInCompositeDirectory(FILE_PRESENT_LOCALLY)); + // Delete the file and assert that it no more is a part of the directory + compositeDirectory.deleteFile(FILE_PRESENT_LOCALLY); + assertFalse(existsInCompositeDirectory(FILE_PRESENT_LOCALLY)); + // Reading deleted file from directory should result in NoSuchFileException + assertThrows(NoSuchFileException.class, () -> compositeDirectory.openInput(FILE_PRESENT_LOCALLY, IOContext.DEFAULT)); + } + + public void testFileLength() throws IOException { + // File present locally + assertTrue(existsInLocalDirectory(FILE_PRESENT_LOCALLY)); + assertFalse(existsInRemoteDirectory(FILE_PRESENT_LOCALLY)); + assertEquals(compositeDirectory.fileLength(FILE_PRESENT_LOCALLY), localDirectory.fileLength(FILE_PRESENT_LOCALLY)); + + // File not present locally - present in Remote + assertFalse(existsInLocalDirectory(FILE_PRESENT_IN_REMOTE_ONLY)); + assertTrue(existsInRemoteDirectory(FILE_PRESENT_IN_REMOTE_ONLY)); + assertEquals( + compositeDirectory.fileLength(FILE_PRESENT_IN_REMOTE_ONLY), + remoteSegmentStoreDirectory.fileLength(FILE_PRESENT_IN_REMOTE_ONLY) + ); + + // File not present in both local and remote + assertFalse(Arrays.asList(compositeDirectory.listAll()).contains(NON_EXISTENT_FILE)); + assertThrows(NoSuchFileException.class, () -> compositeDirectory.fileLength(NON_EXISTENT_FILE)); + } + + public void testCreateOutput() throws IOException { + try (IndexOutput indexOutput = compositeDirectory.createOutput(NEW_FILE, IOContext.DEFAULT)) { + // File not present in FileCache until the indexOutput is Closed + assertNull(fileCache.get(localDirectory.getDirectory().resolve(NEW_FILE))); + } + // File present in FileCache after the indexOutput is Closed + assertNotNull(fileCache.get(localDirectory.getDirectory().resolve(NEW_FILE))); + } + + public void testSync() throws IOException { + // All the files in the below list are present either locally or on remote, so sync should work as expected + Collection names = List.of("_0.cfe", "_0.cfs", "_0.si", "_1.cfe", "_2.cfe", "segments_1"); + compositeDirectory.sync(names); + // Below list contains a non-existent file, hence will throw an error + Collection names1 = List.of("_0.cfe", "_0.cfs", "_0.si", "_1.cfe", "_2.cfe", "segments_1", "non_existent_file"); + assertThrows(NoSuchFileException.class, () -> compositeDirectory.sync(names1)); + } + + public void testRename() throws IOException { + // Rename should work as expected for file present in directory + assertTrue(existsInCompositeDirectory(FILE_PRESENT_LOCALLY)); + compositeDirectory.rename(FILE_PRESENT_LOCALLY, "_1_new.cfe"); + // Should throw error for file not present + assertThrows(NoSuchFileException.class, () -> compositeDirectory.rename(NON_EXISTENT_FILE, "_1_new.cfe")); + } + + public void testOpenInput() throws IOException { + // File not present in Directory + assertFalse(existsInCompositeDirectory(NON_EXISTENT_FILE)); + assertThrows(NoSuchFileException.class, () -> compositeDirectory.openInput(NON_EXISTENT_FILE, IOContext.DEFAULT)); + + // Temp file, read directly form local directory + assertTrue(existsInLocalDirectory(TEMP_FILE) && FileTypeUtils.isTempFile(TEMP_FILE)); + assertEquals( + compositeDirectory.openInput(TEMP_FILE, IOContext.DEFAULT).toString(), + localDirectory.openInput(TEMP_FILE, IOContext.DEFAULT).toString() + ); + + // File present in file cache + assertNotNull(fileCache.get(getFilePath(FILE_PRESENT_LOCALLY))); + assertTrue(compositeDirectory.openInput(FILE_PRESENT_LOCALLY, IOContext.DEFAULT) instanceof FileCachedIndexInput); + + // File present in Remote + assertFalse(existsInLocalDirectory(FILE_PRESENT_IN_REMOTE_ONLY)); + assertTrue(existsInRemoteDirectory(FILE_PRESENT_IN_REMOTE_ONLY)); + assertTrue(compositeDirectory.openInput(FILE_PRESENT_IN_REMOTE_ONLY, IOContext.DEFAULT) instanceof OnDemandBlockSnapshotIndexInput); + } + + public void testClose() throws IOException { + // Similar to delete, when close is called existing openInput should be able to function properly but new requests should not be + // served + IndexInput indexInput = compositeDirectory.openInput(FILE_PRESENT_LOCALLY, IOContext.DEFAULT); + compositeDirectory.close(); + // Any operations after close will throw AlreadyClosedException + assertThrows(AlreadyClosedException.class, () -> compositeDirectory.openInput(FILE_PRESENT_LOCALLY, IOContext.DEFAULT)); + // Existing open IndexInputs will be served + indexInput.getFilePointer(); + indexInput.close(); + assertThrows(RuntimeException.class, indexInput::getFilePointer); + assertThrows(AlreadyClosedException.class, () -> compositeDirectory.close()); + } + + public void testAfterSyncToRemote() throws IOException { + // File will be present locally until uploaded to Remote + assertTrue(existsInLocalDirectory(FILE_PRESENT_LOCALLY)); + compositeDirectory.afterSyncToRemote(FILE_PRESENT_LOCALLY); + fileCache.prune(); + // After uploading to Remote, refCount will be decreased by 1 making it 0 and will be evicted if cache is pruned + assertFalse(existsInLocalDirectory(FILE_PRESENT_LOCALLY)); + // Asserting file is not present in FileCache + assertNull(fileCache.get(getFilePath(FILE_PRESENT_LOCALLY))); + } + + private void addFilesToDirectory(String[] files) throws IOException { + for (String file : files) { + IndexOutput indexOutput = compositeDirectory.createOutput(file, IOContext.DEFAULT); + indexOutput.close(); + } + } + + private void removeExtraFSFiles() throws IOException { + HashSet allFiles = new HashSet<>(Arrays.asList(localDirectory.listAll())); + allFiles.stream().filter(FileTypeUtils::isExtraFSFile).forEach(file -> { + try { + localDirectory.deleteFile(file); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + private boolean existsInLocalDirectory(String name) throws IOException { + return Arrays.asList(localDirectory.listAll()).contains(name); + } + + private boolean existsInRemoteDirectory(String name) throws IOException { + return Arrays.asList(remoteSegmentStoreDirectory.listAll()).contains(name); + } + + private boolean existsInCompositeDirectory(String name) throws IOException { + return Arrays.asList(compositeDirectory.listAll()).contains(name); + } + + private Path getFilePath(String name) { + return localDirectory.getDirectory().resolve(name); + } +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java index ee81369725e6f..ed79a2b0bd8e4 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -20,7 +20,9 @@ import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.support.PlainBlobMetadata; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.core.action.ActionListener; +import org.opensearch.index.store.remote.utils.BlockIOContext; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; @@ -216,6 +218,17 @@ public void testOpenInput() throws IOException { assertTrue(indexInput instanceof RemoteIndexInput); assertEquals(100, indexInput.length()); verify(blobContainer).listBlobsByPrefixInSortedOrder("segment_1", 1, LEXICOGRAPHIC); + + BlockIOContext blockIOContextInvalidValues = new BlockIOContext(IOContext.DEFAULT, 10, 1000); + assertThrows(IllegalArgumentException.class, () -> remoteDirectory.openInput("segment_1", blockIOContextInvalidValues)); + + BlockIOContext blockIOContext = new BlockIOContext(IOContext.DEFAULT, 10, 50); + when(blobContainer.readBlob("segment_1", 10, 50)).thenReturn(mockInputStream); + byte[] bytes = new byte[(int) blockIOContext.getBlockSize()]; + when(mockInputStream.readAllBytes()).thenReturn(bytes); + indexInput = remoteDirectory.openInput("segment_1", blockIOContext); + assertTrue(indexInput instanceof ByteArrayIndexInput); + assertEquals(blockIOContext.getBlockSize(), indexInput.length()); } public void testOpenInputWithLength() throws IOException { diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 567199cf64cd8..574c5bf620474 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; @@ -23,34 +22,25 @@ import org.apache.lucene.store.OutputStreamIndexOutput; import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.lucene.util.Version; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; -import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.index.remote.RemoteStorePathStrategy; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.shard.IndexShardTestCase; -import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; -import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.MockLogAppender; import org.opensearch.test.junit.annotations.TestLogging; import org.opensearch.threadpool.ThreadPool; -import org.junit.After; import org.junit.Before; import java.io.ByteArrayInputStream; @@ -64,7 +54,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -87,95 +76,11 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class RemoteSegmentStoreDirectoryTests extends IndexShardTestCase { - private static final Logger logger = LogManager.getLogger(RemoteSegmentStoreDirectoryTests.class); - private RemoteDirectory remoteDataDirectory; - private RemoteDirectory remoteMetadataDirectory; - private RemoteStoreMetadataLockManager mdLockManager; - - private RemoteSegmentStoreDirectory remoteSegmentStoreDirectory; - private TestUploadListener testUploadTracker; - private IndexShard indexShard; - private SegmentInfos segmentInfos; - private ThreadPool threadPool; - - private final String metadataFilename = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( - 12, - 23, - 34, - 1, - 1, - "node-1" - ); - - private final String metadataFilenameDup = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( - 12, - 23, - 34, - 2, - 1, - "node-2" - ); - private final String metadataFilename2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( - 12, - 13, - 34, - 1, - 1, - "node-1" - ); - private final String metadataFilename3 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( - 10, - 38, - 34, - 1, - 1, - "node-1" - ); - private final String metadataFilename4 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename( - 10, - 36, - 34, - 1, - 1, - "node-1" - ); +public class RemoteSegmentStoreDirectoryTests extends BaseRemoteSegmentStoreDirectoryTests { @Before public void setup() throws IOException { - remoteDataDirectory = mock(RemoteDirectory.class); - remoteMetadataDirectory = mock(RemoteDirectory.class); - mdLockManager = mock(RemoteStoreMetadataLockManager.class); - threadPool = mock(ThreadPool.class); - testUploadTracker = new TestUploadListener(); - - Settings indexSettings = Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .build(); - ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); - - indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory()); - remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( - remoteDataDirectory, - remoteMetadataDirectory, - mdLockManager, - threadPool, - indexShard.shardId() - ); - try (Store store = indexShard.store()) { - segmentInfos = store.readLastCommittedSegmentsInfo(); - } - - when(threadPool.executor(ThreadPool.Names.REMOTE_PURGE)).thenReturn(executorService); - when(threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY)).thenReturn(executorService); - when(threadPool.executor(ThreadPool.Names.SAME)).thenReturn(executorService); - } - - @After - public void tearDown() throws Exception { - indexShard.close("test tearDown", true, false); - super.tearDown(); + setupRemoteSegmentStoreDirectory(); } public void testUploadedSegmentMetadataToString() { @@ -256,60 +161,6 @@ public void testInitMultipleMetadataFile() throws IOException { assertThrows(IllegalStateException.class, () -> remoteSegmentStoreDirectory.init()); } - private Map> populateMetadata() throws IOException { - List metadataFiles = new ArrayList<>(); - - metadataFiles.add(metadataFilename); - metadataFiles.add(metadataFilename2); - metadataFiles.add(metadataFilename3); - - when( - remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( - RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, - METADATA_FILES_TO_FETCH - ) - ).thenReturn(List.of(metadataFilename)); - when( - remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( - RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, - Integer.MAX_VALUE - ) - ).thenReturn(metadataFiles); - - Map> metadataFilenameContentMapping = Map.of( - metadataFilename, - getDummyMetadata("_0", 1), - metadataFilename2, - getDummyMetadata("_0", 1), - metadataFilename3, - getDummyMetadata("_0", 1) - ); - - when(remoteMetadataDirectory.getBlobStream(metadataFilename)).thenAnswer( - I -> createMetadataFileBytes( - metadataFilenameContentMapping.get(metadataFilename), - indexShard.getLatestReplicationCheckpoint(), - segmentInfos - ) - ); - when(remoteMetadataDirectory.getBlobStream(metadataFilename2)).thenAnswer( - I -> createMetadataFileBytes( - metadataFilenameContentMapping.get(metadataFilename2), - indexShard.getLatestReplicationCheckpoint(), - segmentInfos - ) - ); - when(remoteMetadataDirectory.getBlobStream(metadataFilename3)).thenAnswer( - I -> createMetadataFileBytes( - metadataFilenameContentMapping.get(metadataFilename3), - indexShard.getLatestReplicationCheckpoint(), - segmentInfos - ) - ); - - return metadataFilenameContentMapping; - } - public void testInit() throws IOException { populateMetadata(); diff --git a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java index a135802c5f49c..c7d0cc0c5b96e 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java @@ -207,7 +207,7 @@ private void initBlockFiles(int blockSize, FSDirectory fsDirectory) { // write 48, -80 alternatively for (int i = 0; i < numOfBlocks; i++) { // create normal blocks - String blockName = BLOCK_FILE_PREFIX + "." + i; + String blockName = BLOCK_FILE_PREFIX + "_block_" + i; IndexOutput output = fsDirectory.createOutput(blockName, null); // since block size is always even number, safe to do division for (int j = 0; j < blockSize / 2; j++) { @@ -221,7 +221,7 @@ private void initBlockFiles(int blockSize, FSDirectory fsDirectory) { if (numOfBlocks > 1 && sizeOfLastBlock != 0) { // create last block - String lastBlockName = BLOCK_FILE_PREFIX + "." + numOfBlocks; + String lastBlockName = BLOCK_FILE_PREFIX + "_block_" + numOfBlocks; IndexOutput output = fsDirectory.createOutput(lastBlockName, null); for (int i = 0; i < sizeOfLastBlock; i++) { if ((i & 1) == 0) { diff --git a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInputTests.java new file mode 100644 index 0000000000000..258bc2db4c5d0 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInputTests.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.filecache; + +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.common.breaker.NoopCircuitBreaker; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.io.IOException; +import java.nio.file.Path; + +public class FileCachedIndexInputTests extends OpenSearchTestCase { + + protected FileCache fileCache; + protected Path filePath; + protected IndexInput underlyingIndexInput; + private FileCachedIndexInput fileCachedIndexInput; + + protected static final int FILE_CACHE_CAPACITY = 1000; + protected static final String TEST_FILE = "test_file"; + protected static final String SLICE_DESC = "slice_description"; + + @Before + public void setup() throws IOException { + Path basePath = createTempDir("FileCachedIndexInputTests"); + FSDirectory fsDirectory = FSDirectory.open(basePath); + IndexOutput indexOutput = fsDirectory.createOutput(TEST_FILE, IOContext.DEFAULT); + // Writing to the file so that it's size is not zero + indexOutput.writeInt(100); + indexOutput.close(); + filePath = basePath.resolve(TEST_FILE); + underlyingIndexInput = fsDirectory.openInput(TEST_FILE, IOContext.DEFAULT); + fileCache = FileCacheFactory.createConcurrentLRUFileCache(FILE_CACHE_CAPACITY, new NoopCircuitBreaker(CircuitBreaker.REQUEST)); + } + + protected void setupIndexInputAndAddToFileCache() { + fileCachedIndexInput = new FileCachedIndexInput(fileCache, filePath, underlyingIndexInput); + fileCache.put(filePath, new CachedFullFileIndexInput(fileCache, filePath, fileCachedIndexInput)); + } + + public void testClone() throws IOException { + setupIndexInputAndAddToFileCache(); + + // Since the file ia already in cache and has refCount 1, activeUsage and totalUsage will be same + assertTrue(isActiveAndTotalUsageSame()); + + // Decrementing the refCount explicitly on the file which will make it inactive (as refCount will drop to 0) + fileCache.decRef(filePath); + assertFalse(isActiveAndTotalUsageSame()); + + // After cloning the refCount will increase again and activeUsage and totalUsage will be same again + FileCachedIndexInput clonedFileCachedIndexInput = fileCachedIndexInput.clone(); + assertTrue(isActiveAndTotalUsageSame()); + + // Closing the clone will again decrease the refCount making it 0 + clonedFileCachedIndexInput.close(); + assertFalse(isActiveAndTotalUsageSame()); + } + + public void testSlice() throws IOException { + setupIndexInputAndAddToFileCache(); + assertThrows(UnsupportedOperationException.class, () -> fileCachedIndexInput.slice(SLICE_DESC, 10, 100)); + } + + protected boolean isActiveAndTotalUsageSame() { + return fileCache.usage().activeUsage() == fileCache.usage().usage(); + } +} diff --git a/server/src/test/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInputTests.java new file mode 100644 index 0000000000000..7fb7a03584e20 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInputTests.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.filecache; + +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.IndexInput; + +import java.io.IOException; + +public class FullFileCachedIndexInputTests extends FileCachedIndexInputTests { + private FullFileCachedIndexInput fullFileCachedIndexInput; + + @Override + protected void setupIndexInputAndAddToFileCache() { + fullFileCachedIndexInput = new FullFileCachedIndexInput(fileCache, filePath, underlyingIndexInput); + fileCache.put(filePath, new CachedFullFileIndexInput(fileCache, filePath, fullFileCachedIndexInput)); + } + + @Override + public void testClone() throws IOException { + setupIndexInputAndAddToFileCache(); + + // Since the file is already in cache and has refCount 1, activeUsage and totalUsage will be same + assertTrue(isActiveAndTotalUsageSame()); + + // Getting the file cache entry (which wil increase the ref count, hence doing dec ref immediately afterwards) + CachedIndexInput cachedIndexInput = fileCache.get(filePath); + fileCache.decRef(filePath); + + // Decrementing the refCount explicitly on the file which will make it inactive (as refCount will drop to 0) + fileCache.decRef(filePath); + assertFalse(isActiveAndTotalUsageSame()); + + // After cloning the refCount will increase again and activeUsage and totalUsage will be same again + FileCachedIndexInput clonedFileCachedIndexInput1 = fullFileCachedIndexInput.clone(); + FileCachedIndexInput clonedFileCachedIndexInput2 = clonedFileCachedIndexInput1.clone(); + FileCachedIndexInput clonedFileCachedIndexInput3 = clonedFileCachedIndexInput2.clone(); + assertTrue(isActiveAndTotalUsageSame()); + + // closing the first level clone will close all subsequent level clones and reduce ref count to 0 + clonedFileCachedIndexInput1.close(); + assertFalse(isActiveAndTotalUsageSame()); + + fileCache.prune(); + + // since the file cache entry was evicted the corresponding CachedIndexInput will be closed and will throw exception when trying to + // read the index input + assertThrows(AlreadyClosedException.class, cachedIndexInput::getIndexInput); + } + + @Override + public void testSlice() throws IOException { + setupIndexInputAndAddToFileCache(); + + // Throw IllegalArgumentException if offset is negative + assertThrows(IllegalArgumentException.class, () -> fullFileCachedIndexInput.slice(SLICE_DESC, -1, 10)); + + // Throw IllegalArgumentException if length is negative + assertThrows(IllegalArgumentException.class, () -> fullFileCachedIndexInput.slice(SLICE_DESC, 5, -1)); + + // Decrementing the refCount explicitly on the file which will make it inactive (as refCount will drop to 0) + fileCache.decRef(filePath); + assertFalse(isActiveAndTotalUsageSame()); + + // Creating a slice will increase the refCount + IndexInput slicedFileCachedIndexInput = fullFileCachedIndexInput.slice(SLICE_DESC, 1, 2); + assertTrue(isActiveAndTotalUsageSame()); + + // Closing the parent will close all the slices as well decreasing the refCount to 0 + fullFileCachedIndexInput.close(); + assertFalse(isActiveAndTotalUsageSame()); + } +} diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerBlobContainerReaderTests.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerBlobContainerReaderTests.java new file mode 100644 index 0000000000000..24e57cde3ffda --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerBlobContainerReaderTests.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.utils; + +import org.opensearch.common.blobstore.BlobContainer; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +public class TransferManagerBlobContainerReaderTests extends TransferManagerTestCase { + private BlobContainer blobContainer; + + @Override + protected void initializeTransferManager() throws IOException { + blobContainer = mock(BlobContainer.class); + doAnswer(i -> new ByteArrayInputStream(createData())).when(blobContainer).readBlob(eq("blob"), anyLong(), anyLong()); + transferManager = new TransferManager(blobContainer::readBlob, fileCache); + } + + protected void mockExceptionWhileReading() throws IOException { + doThrow(new IOException("Expected test exception")).when(blobContainer).readBlob(eq("failure-blob"), anyLong(), anyLong()); + } + + protected void mockWaitForLatchReader(CountDownLatch latch) throws IOException { + doAnswer(i -> { + latch.await(); + return new ByteArrayInputStream(createData()); + }).when(blobContainer).readBlob(eq("blocking-blob"), anyLong(), anyLong()); + } +} diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerRemoteDirectoryReaderTests.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerRemoteDirectoryReaderTests.java new file mode 100644 index 0000000000000..e777a287bf10f --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerRemoteDirectoryReaderTests.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.utils; + +import org.apache.lucene.store.IOContext; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.common.lucene.store.InputStreamIndexInput; +import org.opensearch.index.store.RemoteDirectory; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +public class TransferManagerRemoteDirectoryReaderTests extends TransferManagerTestCase { + private RemoteDirectory remoteDirectory; + + @Override + protected void initializeTransferManager() throws IOException { + remoteDirectory = mock(RemoteDirectory.class); + doAnswer(i -> new ByteArrayIndexInput("blob", createData())).when(remoteDirectory).openInput(eq("blob"), any()); + transferManager = new TransferManager( + (name, position, length) -> new InputStreamIndexInput( + remoteDirectory.openInput(name, new BlockIOContext(IOContext.DEFAULT, position, length)), + length + ), + fileCache + ); + } + + protected void mockExceptionWhileReading() throws IOException { + doThrow(new IOException("Expected test exception")).when(remoteDirectory).openInput(eq("failure-blob"), any()); + } + + protected void mockWaitForLatchReader(CountDownLatch latch) throws IOException { + doAnswer(i -> { + latch.await(); + return new ByteArrayIndexInput("blocking-blob", createData()); + }).when(remoteDirectory).openInput(eq("blocking-blob"), any()); + } +} diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java similarity index 87% rename from server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java rename to server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java index 7ae3944eb6944..810a4c336fdf7 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java @@ -13,7 +13,6 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.store.SimpleFSLockFactory; -import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.core.common.breaker.CircuitBreaker; import org.opensearch.core.common.breaker.NoopCircuitBreaker; import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; @@ -24,7 +23,6 @@ import org.junit.After; import org.junit.Before; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -36,31 +34,23 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; @ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) -public class TransferManagerTests extends OpenSearchTestCase { - private static final int EIGHT_MB = 1024 * 1024 * 8; - private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache( +public abstract class TransferManagerTestCase extends OpenSearchTestCase { + protected static final int EIGHT_MB = 1024 * 1024 * 8; + protected final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache( EIGHT_MB * 2, 1, new NoopCircuitBreaker(CircuitBreaker.REQUEST) ); - private MMapDirectory directory; - private BlobContainer blobContainer; - private TransferManager transferManager; + protected MMapDirectory directory; + protected TransferManager transferManager; @Before public void setUp() throws Exception { super.setUp(); directory = new MMapDirectory(createTempDir(), SimpleFSLockFactory.INSTANCE); - blobContainer = mock(BlobContainer.class); - doAnswer(i -> new ByteArrayInputStream(createData())).when(blobContainer).readBlob(eq("blob"), anyLong(), anyLong()); - transferManager = new TransferManager(blobContainer, fileCache); + initializeTransferManager(); } @After @@ -68,7 +58,7 @@ public void tearDown() throws Exception { super.tearDown(); } - private static byte[] createData() { + protected static byte[] createData() { final byte[] data = new byte[EIGHT_MB]; data[EIGHT_MB - 1] = 7; return data; @@ -162,7 +152,7 @@ public void testUsageExceedsCapacity() throws Exception { } public void testDownloadFails() throws Exception { - doThrow(new IOException("Expected test exception")).when(blobContainer).readBlob(eq("failure-blob"), anyLong(), anyLong()); + mockExceptionWhileReading(); List blobParts = new ArrayList<>(); blobParts.add(new BlobFetchRequest.BlobPart("failure-blob", 0, EIGHT_MB)); expectThrows( @@ -177,10 +167,7 @@ public void testFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Exception // Mock a call for a blob that will block until the latch is released, // then start the fetch for that blob on a separate thread final CountDownLatch latch = new CountDownLatch(1); - doAnswer(i -> { - latch.await(); - return new ByteArrayInputStream(createData()); - }).when(blobContainer).readBlob(eq("blocking-blob"), anyLong(), anyLong()); + mockWaitForLatchReader(latch); List blobParts = new ArrayList<>(); blobParts.add(new BlobFetchRequest.BlobPart("blocking-blob", 0, EIGHT_MB)); @@ -206,6 +193,12 @@ public void testFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Exception assertFalse(blockingThread.isAlive()); } + protected abstract void initializeTransferManager() throws IOException; + + protected abstract void mockExceptionWhileReading() throws IOException; + + protected abstract void mockWaitForLatchReader(CountDownLatch latch) throws IOException; + private IndexInput fetchBlobWithName(String blobname) throws IOException { List blobParts = new ArrayList<>(); blobParts.add(new BlobFetchRequest.BlobPart("blob", 0, EIGHT_MB)); diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/cache/RefCountedCacheTestCase.java b/server/src/test/java/org/opensearch/index/store/remote/utils/cache/RefCountedCacheTestCase.java index b11740b53e11f..97e9fb288136d 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/cache/RefCountedCacheTestCase.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/cache/RefCountedCacheTestCase.java @@ -211,6 +211,15 @@ public void testComputeStats() { assertEquals(1, refCountedCache.stats().removeCount()); } + public void testClear() { + refCountedCache.put("1", 10L); + refCountedCache.put("2", 10L); + refCountedCache.put("3", 10L); + assertEquals(30L, refCountedCache.usage().usage()); + refCountedCache.clear(); + assertEquals(0L, refCountedCache.usage().usage()); + } + private void assertUsage(long usage, long activeUsage) { assertEquals(usage, refCountedCache.usage().usage()); assertEquals(activeUsage, refCountedCache.usage().activeUsage()); diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 055bcd159efc3..cbb6ca0d1b0fd 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2095,6 +2095,10 @@ protected boolean addMockTransportService() { return true; } + protected boolean addMockIndexStorePlugin() { + return true; + } + /** Returns {@code true} iff this test cluster should use a dummy http transport */ protected boolean addMockHttpTransport() { return true; @@ -2137,7 +2141,7 @@ protected Collection> getMockPlugins() { if (randomBoolean() && addMockTransportService()) { mocks.add(MockTransportService.TestPlugin.class); } - if (randomBoolean()) { + if (randomBoolean() && addMockIndexStorePlugin()) { mocks.add(MockFSIndexStore.TestPlugin.class); } if (randomBoolean()) {