Skip to content

Commit

Permalink
[Remote Store] Upload segments to remote store post refresh (opensear…
Browse files Browse the repository at this point in the history
…ch-project#3460)

* Add RemoteDirectory interface to copy segment files to/from remote store

Signed-off-by: Sachin Kale <kalsac@amazon.com>

Co-authored-by: Sachin Kale <kalsac@amazon.com>

* Add index level setting for remote store

Signed-off-by: Sachin Kale <kalsac@amazon.com>

Co-authored-by: Sachin Kale <kalsac@amazon.com>

* Add RemoteDirectoryFactory and use RemoteDirectory instance in RefreshListener

Co-authored-by: Sachin Kale <kalsac@amazon.com>
Signed-off-by: Sachin Kale <kalsac@amazon.com>

* Upload segment to remote store post refresh

Signed-off-by: Sachin Kale <kalsac@amazon.com>

Co-authored-by: Sachin Kale <kalsac@amazon.com>
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale and Sachin Kale committed Sep 2, 2022
1 parent 7187bd0 commit ecbfbcf
Show file tree
Hide file tree
Showing 24 changed files with 1,183 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,8 @@ public static final IndexShard newIndexShard(
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs,
SegmentReplicationCheckpointPublisher.EMPTY
SegmentReplicationCheckpointPublisher.EMPTY,
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,17 @@ public Iterator<Setting<?>> settings() {
Property.Final
);

public static final String SETTING_REMOTE_STORE = "index.remote_store";
/**
* Used to specify if the index data should be persisted in the remote store.
*/
public static final Setting<Boolean> INDEX_REMOTE_STORE_SETTING = Setting.boolSetting(
SETTING_REMOTE_STORE,
false,
Property.IndexScope,
Property.Final
);

public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
*/
public static final Map<String, Setting> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of(
FeatureFlags.REPLICATION_TYPE,
IndexMetadata.INDEX_REPLICATION_TYPE_SETTING
IndexMetadata.INDEX_REPLICATION_TYPE_SETTING,
FeatureFlags.REMOTE_STORE,
IndexMetadata.INDEX_REMOTE_STORE_SETTING
);

public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ public class FeatureFlags {
*/
public static final String REPLICATION_TYPE = "opensearch.experimental.feature.replication_type.enabled";

/**
* Gates the visibility of the index setting that allows persisting data to remote store along with local disk.
* Once the feature is ready for production release, this feature flag can be removed.
*/
public static final String REMOTE_STORE = "opensearch.experimental.feature.remote_store.enabled";

/**
* Used to test feature flags whose values are expected to be booleans.
* This method returns true if the value is "true" (case-insensitive),
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.RemoteDirectoryFactory;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
Expand Down Expand Up @@ -118,6 +119,8 @@ public final class IndexModule {

private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory();

private static final RemoteDirectoryFactory REMOTE_DIRECTORY_FACTORY = new RemoteDirectoryFactory();

private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new;

public static final Setting<String> INDEX_STORE_TYPE_SETTING = new Setting<>(
Expand Down Expand Up @@ -528,6 +531,7 @@ public IndexService newIndexService(
client,
queryCache,
directoryFactory,
REMOTE_DIRECTORY_FACTORY,
eventListener,
readerWrapperFactory,
mapperRegistry,
Expand Down
28 changes: 26 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.RemoteStoreRefreshListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardNotFoundException;
Expand All @@ -96,6 +97,9 @@
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -136,6 +140,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final NodeEnvironment nodeEnv;
private final ShardStoreDeleter shardStoreDeleter;
private final IndexStorePlugin.DirectoryFactory directoryFactory;
private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory;
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
private final IndexCache indexCache;
Expand Down Expand Up @@ -190,6 +195,7 @@ public IndexService(
Client client,
QueryCache queryCache,
IndexStorePlugin.DirectoryFactory directoryFactory,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
IndexEventListener eventListener,
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory,
MapperRegistry mapperRegistry,
Expand Down Expand Up @@ -260,6 +266,7 @@ public IndexService(
this.eventListener = eventListener;
this.nodeEnv = nodeEnv;
this.directoryFactory = directoryFactory;
this.remoteDirectoryFactory = remoteDirectoryFactory;
this.recoveryStateFactory = recoveryStateFactory;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory);
Expand Down Expand Up @@ -430,7 +437,8 @@ public synchronized IndexShard createShard(
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final RepositoriesService repositoriesService
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -504,6 +512,21 @@ public synchronized IndexShard createShard(
}
};
Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
Directory remoteDirectory = null;
RemoteStoreRefreshListener remoteStoreRefreshListener = null;
if (this.indexSettings.isRemoteStoreEnabled()) {
try {
Repository repository = repositoriesService.repository(clusterService.state().metadata().clusterUUID());
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository);
remoteStoreRefreshListener = new RemoteStoreRefreshListener(directory, remoteDirectory);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException(
"Repository should be created before creating index with remote_store enabled setting",
e
);
}
}

store = new Store(
shardId,
this.indexSettings,
Expand Down Expand Up @@ -533,7 +556,8 @@ public synchronized IndexShard createShard(
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStoreRefreshListener
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
9 changes: 9 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ public final class IndexSettings {
private final Settings nodeSettings;
private final int numberOfShards;
private final ReplicationType replicationType;
private final boolean isRemoteStoreEnabled;
// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
private volatile Settings settings;
private volatile IndexMetadata indexMetadata;
Expand Down Expand Up @@ -703,6 +704,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
this.indexMetadata = indexMetadata;
numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null);
replicationType = ReplicationType.parseString(settings.get(IndexMetadata.SETTING_REPLICATION_TYPE));
isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE, false);

this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
Expand Down Expand Up @@ -946,6 +948,13 @@ public boolean isSegRepEnabled() {
return ReplicationType.SEGMENT.equals(replicationType);
}

/**
* Returns if remote store is enabled for this index.
*/
public boolean isRemoteStoreEnabled() {
return isRemoteStoreEnabled;
}

/**
* Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the
* index settings and the node settings where node settings are overwritten by index settings.
Expand Down
10 changes: 8 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ Runnable getGlobalCheckpointSyncer() {
private final RefreshPendingLocationListener refreshPendingLocationListener;
private volatile boolean useRetentionLeasesInPeerRecovery;

private final RemoteStoreRefreshListener remoteStoreRefreshListener;

public IndexShard(
final ShardRouting shardRouting,
final IndexSettings indexSettings,
Expand All @@ -326,7 +328,8 @@ public IndexShard(
final Runnable globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final RemoteStoreRefreshListener remoteStoreRefreshListener
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -410,6 +413,7 @@ public boolean shouldCache(Query query) {
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
this.checkpointPublisher = checkpointPublisher;
this.remoteStoreRefreshListener = remoteStoreRefreshListener;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -3222,7 +3226,9 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {

final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));

if (remoteStoreRefreshListener != null && shardRouting.primary()) {
internalRefreshListener.add(remoteStoreRefreshListener);
}
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;

import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

/**
* RefreshListener implementation to upload newly created segment files to the remote store
*/
public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener {

private final Directory storeDirectory;
private final Directory remoteDirectory;
// ToDo: This can be a map with metadata of the uploaded file as value of the map (GitHub #3398)
private final Set<String> filesUploadedToRemoteStore;
private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class);

public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) throws IOException {
this.storeDirectory = storeDirectory;
this.remoteDirectory = remoteDirectory;
// ToDo: Handle failures in reading list of files (GitHub #3397)
this.filesUploadedToRemoteStore = new HashSet<>(Arrays.asList(remoteDirectory.listAll()));
}

@Override
public void beforeRefresh() throws IOException {
// Do Nothing
}

/**
* Upload new segment files created as part of the last refresh to the remote segment store.
* The method also deletes segment files from remote store which are not part of local filesystem.
* @param didRefresh true if the refresh opened a new reference
* @throws IOException in case of I/O error in reading list of local files
*/
@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh) {
Set<String> localFiles = Set.of(storeDirectory.listAll());
localFiles.stream().filter(file -> !filesUploadedToRemoteStore.contains(file)).forEach(file -> {
try {
remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT);
filesUploadedToRemoteStore.add(file);
} catch (NoSuchFileException e) {
logger.info(
() -> new ParameterizedMessage("The file {} does not exist anymore. It can happen in case of temp files", file),
e
);
} catch (IOException e) {
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e);
}
});

Set<String> remoteFilesToBeDeleted = new HashSet<>();
// ToDo: Instead of deleting files in sync, mark them and delete in async/periodic flow (GitHub #3142)
filesUploadedToRemoteStore.stream().filter(file -> !localFiles.contains(file)).forEach(file -> {
try {
remoteDirectory.deleteFile(file);
remoteFilesToBeDeleted.add(file);
} catch (IOException e) {
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
logger.warn(() -> new ParameterizedMessage("Exception while deleting file {} from the remote segment store", file), e);
}
});

remoteFilesToBeDeleted.forEach(filesUploadedToRemoteStore::remove);
}
}
}
Loading

0 comments on commit ecbfbcf

Please sign in to comment.