Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Support Remote Translog Functionality #5755

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.opensearch.index.mapper.SourceToParse;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.TestTranslog;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogStats;
Expand Down Expand Up @@ -675,6 +676,7 @@ public static final IndexShard newIndexShard(
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs,
new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ public Iterator<Setting<?>> settings() {
public static final String SETTING_REMOTE_STORE_REPOSITORY = "index.remote_store.repository";

public static final String SETTING_REMOTE_TRANSLOG_STORE_ENABLED = "index.remote_store.translog.enabled";

public static final String SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY = "index.remote_store.translog.repository";

/**
* Used to specify if the index data should be persisted in the remote store.
*/
Expand Down Expand Up @@ -406,6 +409,43 @@ public Iterator<Setting<?>> settings() {
Property.Final
);

public static final Setting<String> INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING = Setting.simpleString(
SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY,
new Setting.Validator<>() {

@Override
public void validate(final String value) {}

@Override
public void validate(final String value, final Map<Setting<?>, Object> settings) {
if (value == null || value.isEmpty()) {
throw new IllegalArgumentException(
"Setting " + INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey() + " should be provided with non-empty repository ID"
);
} else {
final Boolean isRemoteTranslogStoreEnabled = (Boolean) settings.get(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING);
if (isRemoteTranslogStoreEnabled == null || isRemoteTranslogStoreEnabled == false) {
throw new IllegalArgumentException(
"Settings "
+ INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey()
+ " can only be set/enabled when "
+ INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING.getKey()
+ " is set to true"
);
}
}
}

@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = Collections.singletonList(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING);
return settings.iterator();
}
},
Property.IndexScope,
Property.Final
);

public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void close() {
// nothing to do here...
}

private synchronized Path buildAndCreate(BlobPath path) throws IOException {
protected synchronized Path buildAndCreate(BlobPath path) throws IOException {
Path f = buildPath(path);
if (readOnly == false) {
Files.createDirectories(f);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,19 @@ public void skipBytes(long count) {
pos += count;
}

// NOTE: AIOOBE not EOF if you read too much
@Override
public byte readByte() {
public byte readByte() throws EOFException {
if (eof()) {
throw new EOFException();
}
return bytes[pos++];
}

// NOTE: AIOOBE not EOF if you read too much
@Override
public void readBytes(byte[] b, int offset, int len) {
public void readBytes(byte[] b, int offset, int len) throws EOFException {
if (available() < len) {
throw new EOFException();
}
System.arraycopy(bytes, pos, b, offset, len);
pos += len;
}
Expand All @@ -111,6 +115,9 @@ protected void ensureCanReadBytes(int length) throws EOFException {

@Override
public int read() throws IOException {
if (eof()) {
throw new EOFException();
}
return bytes[pos++] & 0xFF;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
FeatureFlags.REMOTE_STORE,
List.of(
IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING,
IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING,
IndexMetadata.INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING,
IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING
),
FeatureFlags.SEARCHABLE_SNAPSHOT,
List.of(
Expand Down
6 changes: 4 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,8 @@ public IndexService newIndexService(
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -555,7 +556,8 @@ public IndexService newIndexService(
allowExpensiveQueries,
expressionResolver,
valuesSourceRegistry,
recoveryStateFactory
recoveryStateFactory,
repositoriesServiceSupplier
);
success = true;
return indexService;
Expand Down
18 changes: 17 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,18 @@
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -172,6 +176,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final IndexNameExpressionResolver expressionResolver;
private final Supplier<Sort> indexSortSupplier;
private final ValuesSourceRegistry valuesSourceRegistry;
private final Supplier<RepositoriesService> repositoriesServiceSupplier;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -203,7 +208,8 @@ public IndexService(
BooleanSupplier allowExpensiveQueries,
IndexNameExpressionResolver expressionResolver,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -275,6 +281,7 @@ public IndexService(
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
this.repositoriesServiceSupplier = repositoriesServiceSupplier;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -517,6 +524,14 @@ public synchronized IndexShard createShard(
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY);
}

TranslogFactory translogFactory = this.indexSettings.isRemoteTranslogStoreEnabled() && routing.primary()
? new RemoteBlobStoreInternalTranslogFactory(
repositoriesServiceSupplier,
threadPool,
this.indexSettings.getRemoteStoreTranslogRepository()
)
: new InternalTranslogFactory();

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
store = new Store(
shardId,
Expand Down Expand Up @@ -547,6 +562,7 @@ public synchronized IndexShard createShard(
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService,
translogFactory,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore
);
Expand Down
10 changes: 8 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,9 @@ public final class IndexSettings {
private final int numberOfShards;
private final ReplicationType replicationType;
private final boolean isRemoteStoreEnabled;
private final String remoteStoreRepository;
private final boolean isRemoteTranslogStoreEnabled;
private final String remoteStoreTranslogRepository;
private final String remoteStoreRepository;
private final boolean isRemoteSnapshot;
private Version extendedCompatibilitySnapshotVersion;
// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
Expand Down Expand Up @@ -750,8 +751,9 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null);
replicationType = ReplicationType.parseString(settings.get(IndexMetadata.SETTING_REPLICATION_TYPE));
isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY);
isRemoteTranslogStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false);
remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY);
remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY);
isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);

if (isRemoteSnapshot && FeatureFlags.isEnabled(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) {
Expand Down Expand Up @@ -1040,6 +1042,10 @@ public Version getExtendedCompatibilitySnapshotVersion() {
return extendedCompatibilitySnapshotVersion;
}

public String getRemoteStoreTranslogRepository() {
return remoteStoreTranslogRepository;
}

/**
* Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the
* index settings and the node settings where node settings are overwritten by index settings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -150,6 +152,8 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {

private final TranslogConfig translogConfig;

private final TranslogFactory translogFactory;

public EngineConfig(
ShardId shardId,
ThreadPool threadPool,
Expand Down Expand Up @@ -253,7 +257,8 @@ public EngineConfig(
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier,
false
false,
new InternalTranslogFactory()
);
}

Expand Down Expand Up @@ -284,7 +289,8 @@ public EngineConfig(
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica
boolean isReadOnlyReplica,
TranslogFactory translogFactory
) {
if (isReadOnlyReplica && indexSettings.isSegRepEnabled() == false) {
throw new IllegalArgumentException("Shard can only be wired as a read only replica with Segment Replication enabled");
Expand Down Expand Up @@ -328,6 +334,7 @@ public EngineConfig(
this.primaryTermSupplier = primaryTermSupplier;
this.tombstoneDocSupplier = tombstoneDocSupplier;
this.isReadOnlyReplica = isReadOnlyReplica;
this.translogFactory = translogFactory;
}

/**
Expand Down Expand Up @@ -532,6 +539,14 @@ public boolean isReadOnlyReplica() {
return indexSettings.isSegRepEnabled() && isReadOnlyReplica;
}

/**
* Returns the underlying translog factory
* @return the translog factory
*/
public TranslogFactory getTranslogFactory() {
return translogFactory;
}

/**
* A supplier supplies tombstone documents which will be used in soft-update methods.
* The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.PluginsService;
Expand Down Expand Up @@ -147,7 +148,8 @@ public EngineConfig newEngineConfig(
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica
boolean isReadOnlyReplica,
TranslogFactory translogFactory
) {
CodecService codecServiceToUse = codecService;
if (codecService == null && this.codecServiceFactory != null) {
Expand Down Expand Up @@ -178,7 +180,8 @@ public EngineConfig newEngineConfig(
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier,
isReadOnlyReplica
isReadOnlyReplica,
translogFactory
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ public void onFailure(String reason, Exception ex) {
() -> getLocalCheckpointTracker(),
translogUUID,
new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId),
this::ensureOpen
this::ensureOpen,
engineConfig.getTranslogFactory()
);
this.translogManager = translogManagerRef;
this.softDeletesPolicy = newSoftDeletesPolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public void onAfterTranslogSync() {
}
}
},
this
this,
engineConfig.getTranslogFactory()
);
this.translogManager = translogManagerRef;
} catch (IOException e) {
Expand Down
Loading