Skip to content

Commit

Permalink
Add RemoteDirectory instance to Store as a secondary directory
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
Sachin Kale committed May 11, 2022
1 parent bcfd328 commit ce5f6cd
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 5 deletions.
4 changes: 4 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.RemoteDirectoryFactory;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
Expand Down Expand Up @@ -118,6 +119,8 @@ public final class IndexModule {

private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory();

private static final RemoteDirectoryFactory REMOTE_DIRECTORY_FACTORY = new RemoteDirectoryFactory();

private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new;

public static final Setting<String> INDEX_STORE_TYPE_SETTING = new Setting<>(
Expand Down Expand Up @@ -511,6 +514,7 @@ public IndexService newIndexService(
client,
queryCache,
directoryFactory,
REMOTE_DIRECTORY_FACTORY,
eventListener,
readerWrapperFactory,
mapperRegistry,
Expand Down
22 changes: 21 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -135,6 +138,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final NodeEnvironment nodeEnv;
private final ShardStoreDeleter shardStoreDeleter;
private final IndexStorePlugin.DirectoryFactory directoryFactory;
private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory;
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
private final IndexCache indexCache;
Expand Down Expand Up @@ -189,6 +193,7 @@ public IndexService(
Client client,
QueryCache queryCache,
IndexStorePlugin.DirectoryFactory directoryFactory,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
IndexEventListener eventListener,
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory,
MapperRegistry mapperRegistry,
Expand Down Expand Up @@ -259,6 +264,7 @@ public IndexService(
this.eventListener = eventListener;
this.nodeEnv = nodeEnv;
this.directoryFactory = directoryFactory;
this.remoteDirectoryFactory = remoteDirectoryFactory;
this.recoveryStateFactory = recoveryStateFactory;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory);
Expand Down Expand Up @@ -423,7 +429,8 @@ private long getAvgShardSizeInBytes() throws IOException {
public synchronized IndexShard createShard(
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer
final RetentionLeaseSyncer retentionLeaseSyncer,
final RepositoriesService repositoriesService
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -497,10 +504,23 @@ public synchronized IndexShard createShard(
}
};
Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
Directory remoteDirectory = null;
if (this.indexSettings.isRemoteStoreEnabled()) {
try {
Repository repository = repositoriesService.repository("dragon-stone");
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException(
"Repository should be created before creating index with remote_store enabled setting",
e
);
}
}
store = new Store(
shardId,
this.indexSettings,
directory,
remoteDirectory,
lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId))
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.lucene.store.Directory;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;

public class RemoteDirectoryFactory implements IndexStorePlugin.RemoteDirectoryFactory {

@Override
public Directory newDirectory(IndexSettings indexSettings, ShardPath path, Repository repository) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobPath blobPath = new BlobPath();
blobPath = blobPath.add(indexSettings.getIndex().getName()).add(String.valueOf(path.getShardId().getId()));
BlobContainer blobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(blobPath);
return new RemoteDirectory(blobContainer);
}
}
38 changes: 36 additions & 2 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref

private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final StoreDirectory directory;
private final StoreDirectory secondaryDirectory;
private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock();
private final ShardLock shardLock;
private final OnClose onClose;
Expand All @@ -187,15 +188,36 @@ protected void closeInternal() {
};

public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock) {
this(shardId, indexSettings, directory, shardLock, OnClose.EMPTY);
this(shardId, indexSettings, directory, null, shardLock, OnClose.EMPTY);
}

public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, Directory secondaryDirectory, ShardLock shardLock) {
this(shardId, indexSettings, directory, secondaryDirectory, shardLock, OnClose.EMPTY);
}

public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock, OnClose onClose) {
this(shardId, indexSettings, directory, null, shardLock, onClose);
}

public Store(
ShardId shardId,
IndexSettings indexSettings,
Directory directory,
Directory secondaryDirectory,
ShardLock shardLock,
OnClose onClose
) {
super(shardId, indexSettings);
final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING);
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
ByteSizeCachingDirectory sizeCachingDir = new ByteSizeCachingDirectory(directory, refreshInterval);
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId));
if (secondaryDirectory != null) {
ByteSizeCachingDirectory sizeCachingSecondaryDir = new ByteSizeCachingDirectory(secondaryDirectory, refreshInterval);
this.secondaryDirectory = new StoreDirectory(sizeCachingSecondaryDir, Loggers.getLogger("index.store.deletes", shardId));
} else {
this.secondaryDirectory = null;
}
this.shardLock = shardLock;
this.onClose = onClose;

Expand All @@ -205,8 +227,20 @@ public Store(ShardId shardId, IndexSettings indexSettings, Directory directory,
}

public Directory directory() {
return directory(true);
}

public Directory secondaryDirectory() {
return directory(false);
}

private Directory directory(boolean primary) {
ensureOpen();
return directory;
if (primary) {
return directory;
} else {
return secondaryDirectory;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ public IndexShard createShard(
IndexService indexService = indexService(shardRouting.index());
assert indexService != null;
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, repositoriesService);
indexShard.addShardFailureCallback(onShardFailure);
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
Expand Down
17 changes: 17 additions & 0 deletions server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.repositories.Repository;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -66,6 +67,22 @@ interface DirectoryFactory {
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException;
}

/**
* An interface that describes how to create a new remote directory instance per shard.
*/
@FunctionalInterface
interface RemoteDirectoryFactory {
/**
* Creates a new remote directory per shard. This method is called once per shard on shard creation.
* @param indexSettings the shards index settings
* @param shardPath the path the shard is using
* @param repository to get the BlobContainer details
* @return a new RemoteDirectory instance
* @throws IOException if an IOException occurs while opening the directory
*/
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, Repository repository) throws IOException;
}

/**
* The {@link DirectoryFactory} mappings for this plugin. When an index is created the store type setting
* {@link org.opensearch.index.IndexModule#INDEX_STORE_TYPE_SETTING} on the index will be examined and either use the default or a
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.lucene.store.Directory;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

public class RemoteDirectoryFactoryTests extends OpenSearchTestCase {

private RemoteDirectoryFactory remoteDirectoryFactory;

@Before
public void setup() {
remoteDirectoryFactory = new RemoteDirectoryFactory();
}

public void testNewDirectory() throws IOException {
Settings settings = Settings.builder().build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0");
ShardPath shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0));
BlobStoreRepository repository = mock(BlobStoreRepository.class);
BlobStore blobStore = mock(BlobStore.class);
BlobContainer blobContainer = mock(BlobContainer.class);
when(repository.blobStore()).thenReturn(blobStore);
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap());

Directory directory = remoteDirectoryFactory.newDirectory(indexSettings, shardPath, repository);
assertTrue(directory instanceof RemoteDirectory);
ArgumentCaptor<BlobPath> blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class);
verify(blobStore).blobContainer(blobPathCaptor.capture());
BlobPath blobPath = blobPathCaptor.getValue();
assertEquals("foo/0/", blobPath.buildAsString());

directory.listAll();
verify(blobContainer).listBlobs();
}
}
19 changes: 19 additions & 0 deletions server/src/test/java/org/opensearch/index/store/StoreTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1138,4 +1138,23 @@ public void testGetPendingFiles() throws IOException {
}
}
}

public void testStoreWithoutSecondaryDirectory() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
Directory primaryDirectory = new NIOFSDirectory(createTempDir("primary"));
try (Store store = new Store(shardId, INDEX_SETTINGS, primaryDirectory, new DummyShardLock(shardId))) {
assertEquals(primaryDirectory, FilterDirectory.unwrap(FilterDirectory.unwrap(store.directory())));
assertNull(store.secondaryDirectory());
}
}

public void testStoreWithSecondaryDirectory() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
Directory primaryDirectory = new NIOFSDirectory(createTempDir("primary"));
Directory secondaryDirectory = new NIOFSDirectory(createTempDir("secondary"));
try (Store store = new Store(shardId, INDEX_SETTINGS, primaryDirectory, secondaryDirectory, new DummyShardLock(shardId))) {
assertEquals(primaryDirectory, FilterDirectory.unwrap(FilterDirectory.unwrap(store.directory())));
assertEquals(secondaryDirectory, FilterDirectory.unwrap(FilterDirectory.unwrap(store.secondaryDirectory())));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem
newRouting = newRouting.moveToUnassigned(unassignedInfo)
.updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
newRouting = ShardRoutingHelper.initialize(newRouting, nodeId);
IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY);
IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY, null);
IndexShardTestCase.updateRoutingEntry(shard, newRouting);
assertEquals(5, counter.get());
final DiscoveryNode localNode = new DiscoveryNode(
Expand Down

0 comments on commit ce5f6cd

Please sign in to comment.