Skip to content

Commit

Permalink
[Remote Store] Upload segments to remote store post refresh (opensear…
Browse files Browse the repository at this point in the history
…ch-project#3460)

* Add RemoteDirectory interface to copy segment files to/from remote store

Signed-off-by: Sachin Kale <kalsac@amazon.com>

Co-authored-by: Sachin Kale <kalsac@amazon.com>

* Add index level setting for remote store

Signed-off-by: Sachin Kale <kalsac@amazon.com>

Co-authored-by: Sachin Kale <kalsac@amazon.com>

* Add RemoteDirectoryFactory and use RemoteDirectory instance in RefreshListener

Co-authored-by: Sachin Kale <kalsac@amazon.com>
Signed-off-by: Sachin Kale <kalsac@amazon.com>

* Upload segment to remote store post refresh

Signed-off-by: Sachin Kale <kalsac@amazon.com>

Co-authored-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
2 people authored and Bukhtawar committed Jun 21, 2022
1 parent eda2f05 commit 7d3f24e
Show file tree
Hide file tree
Showing 24 changed files with 1,176 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,8 @@ public static final IndexShard newIndexShard(
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs,
SegmentReplicationCheckpointPublisher.EMPTY
SegmentReplicationCheckpointPublisher.EMPTY,
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,17 @@ public Iterator<Setting<?>> settings() {
Property.Final
);

public static final String SETTING_REMOTE_STORE = "index.remote_store";
/**
* Used to specify if the index data should be persisted in the remote store.
*/
public static final Setting<Boolean> INDEX_REMOTE_STORE_SETTING = Setting.boolSetting(
SETTING_REMOTE_STORE,
false,
Property.IndexScope,
Property.Final
);

public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
*/
public static final Map<String, Setting> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of(
FeatureFlags.REPLICATION_TYPE,
IndexMetadata.INDEX_REPLICATION_TYPE_SETTING
IndexMetadata.INDEX_REPLICATION_TYPE_SETTING,
FeatureFlags.REMOTE_STORE,
IndexMetadata.INDEX_REMOTE_STORE_SETTING
);

public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ public class FeatureFlags {
*/
public static final String REPLICATION_TYPE = "opensearch.experimental.feature.replication_type.enabled";

/**
* Gates the visibility of the index setting that allows persisting data to remote store along with local disk.
* Once the feature is ready for production release, this feature flag can be removed.
*/
public static final String REMOTE_STORE = "opensearch.experimental.feature.remote_store.enabled";

/**
* Used to test feature flags whose values are expected to be booleans.
* This method returns true if the value is "true" (case-insensitive),
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.RemoteDirectoryFactory;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
Expand Down Expand Up @@ -118,6 +119,8 @@ public final class IndexModule {

private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory();

private static final RemoteDirectoryFactory REMOTE_DIRECTORY_FACTORY = new RemoteDirectoryFactory();

private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new;

public static final Setting<String> INDEX_STORE_TYPE_SETTING = new Setting<>(
Expand Down Expand Up @@ -516,6 +519,7 @@ public IndexService newIndexService(
client,
queryCache,
directoryFactory,
REMOTE_DIRECTORY_FACTORY,
eventListener,
readerWrapperFactory,
mapperRegistry,
Expand Down
28 changes: 26 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.RemoteStoreRefreshListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardNotFoundException;
Expand All @@ -96,6 +97,9 @@
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -136,6 +140,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final NodeEnvironment nodeEnv;
private final ShardStoreDeleter shardStoreDeleter;
private final IndexStorePlugin.DirectoryFactory directoryFactory;
private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory;
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
private final IndexCache indexCache;
Expand Down Expand Up @@ -190,6 +195,7 @@ public IndexService(
Client client,
QueryCache queryCache,
IndexStorePlugin.DirectoryFactory directoryFactory,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
IndexEventListener eventListener,
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory,
MapperRegistry mapperRegistry,
Expand Down Expand Up @@ -260,6 +266,7 @@ public IndexService(
this.eventListener = eventListener;
this.nodeEnv = nodeEnv;
this.directoryFactory = directoryFactory;
this.remoteDirectoryFactory = remoteDirectoryFactory;
this.recoveryStateFactory = recoveryStateFactory;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory);
Expand Down Expand Up @@ -430,7 +437,8 @@ public synchronized IndexShard createShard(
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final RepositoriesService repositoriesService
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -504,6 +512,21 @@ public synchronized IndexShard createShard(
}
};
Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
Directory remoteDirectory = null;
RemoteStoreRefreshListener remoteStoreRefreshListener = null;
if (this.indexSettings.isRemoteStoreEnabled()) {
try {
Repository repository = repositoriesService.repository(clusterService.state().metadata().clusterUUID());
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository);
remoteStoreRefreshListener = new RemoteStoreRefreshListener(directory, remoteDirectory);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException(
"Repository should be created before creating index with remote_store enabled setting",
e
);
}
}

store = new Store(
shardId,
this.indexSettings,
Expand Down Expand Up @@ -533,7 +556,8 @@ public synchronized IndexShard createShard(
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService,
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null,
remoteStoreRefreshListener
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
9 changes: 9 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ public final class IndexSettings {
private final Settings nodeSettings;
private final int numberOfShards;
private final ReplicationType replicationType;
private final boolean isRemoteStoreEnabled;
// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
private volatile Settings settings;
private volatile IndexMetadata indexMetadata;
Expand Down Expand Up @@ -686,6 +687,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
this.indexMetadata = indexMetadata;
numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null);
replicationType = ReplicationType.parseString(settings.get(IndexMetadata.SETTING_REPLICATION_TYPE));
isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE, false);

this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
Expand Down Expand Up @@ -927,6 +929,13 @@ public boolean isSegRepEnabled() {
return ReplicationType.SEGMENT.equals(replicationType);
}

/**
* Returns if remote store is enabled for this index.
*/
public boolean isRemoteStoreEnabled() {
return isRemoteStoreEnabled;
}

/**
* Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the
* index settings and the node settings where node settings are overwritten by index settings.
Expand Down
16 changes: 11 additions & 5 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ Runnable getGlobalCheckpointSyncer() {
private volatile boolean useRetentionLeasesInPeerRecovery;
private final ReferenceManager.RefreshListener checkpointRefreshListener;

private final RemoteStoreRefreshListener remoteStoreRefreshListener;

public IndexShard(
final ShardRouting shardRouting,
final IndexSettings indexSettings,
Expand All @@ -326,7 +328,8 @@ public IndexShard(
final Runnable globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final RemoteStoreRefreshListener remoteStoreRefreshListener
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -414,6 +417,7 @@ public boolean shouldCache(Query query) {
} else {
this.checkpointRefreshListener = null;
}
this.remoteStoreRefreshListener = remoteStoreRefreshListener;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -3139,11 +3143,13 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
}
};

final List<ReferenceManager.RefreshListener> internalRefreshListener;
final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (remoteStoreRefreshListener != null && shardRouting.primary()) {
internalRefreshListener.add(remoteStoreRefreshListener);
}
if (this.checkpointRefreshListener != null) {
internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener);
} else {
internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric));
internalRefreshListener.add(checkpointRefreshListener);
}

return this.engineConfigFactory.newEngineConfig(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;

import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

/**
* RefreshListener implementation to upload newly created segment files to the remote store
*/
public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener {

private final Directory storeDirectory;
private final Directory remoteDirectory;
// ToDo: This can be a map with metadata of the uploaded file as value of the map (GitHub #3398)
private final Set<String> filesUploadedToRemoteStore;
private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class);

public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) throws IOException {
this.storeDirectory = storeDirectory;
this.remoteDirectory = remoteDirectory;
// ToDo: Handle failures in reading list of files (GitHub #3397)
this.filesUploadedToRemoteStore = new HashSet<>(Arrays.asList(remoteDirectory.listAll()));
}

@Override
public void beforeRefresh() throws IOException {
// Do Nothing
}

/**
* Upload new segment files created as part of the last refresh to the remote segment store.
* The method also deletes segment files from remote store which are not part of local filesystem.
* @param didRefresh true if the refresh opened a new reference
* @throws IOException in case of I/O error in reading list of local files
*/
@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh) {
Set<String> localFiles = Set.of(storeDirectory.listAll());
localFiles.stream().filter(file -> !filesUploadedToRemoteStore.contains(file)).forEach(file -> {
try {
remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT);
filesUploadedToRemoteStore.add(file);
} catch (NoSuchFileException e) {
logger.info(
() -> new ParameterizedMessage("The file {} does not exist anymore. It can happen in case of temp files", file),
e
);
} catch (IOException e) {
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e);
}
});

Set<String> remoteFilesToBeDeleted = new HashSet<>();
// ToDo: Instead of deleting files in sync, mark them and delete in async/periodic flow (GitHub #3142)
filesUploadedToRemoteStore.stream().filter(file -> !localFiles.contains(file)).forEach(file -> {
try {
remoteDirectory.deleteFile(file);
remoteFilesToBeDeleted.add(file);
} catch (IOException e) {
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
logger.warn(() -> new ParameterizedMessage("Exception while deleting file {} from the remote segment store", file), e);
}
});

remoteFilesToBeDeleted.forEach(filesUploadedToRemoteStore::remove);
}
}
}
Loading

0 comments on commit 7d3f24e

Please sign in to comment.