Skip to content

Commit

Permalink
[Segment Replication] Refactor remote replication source
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 committed Jul 18, 2023
1 parent 7642e43 commit a7f283d
Show file tree
Hide file tree
Showing 17 changed files with 185 additions and 91 deletions.
12 changes: 12 additions & 0 deletions libs/core/src/main/java/org/opensearch/core/index/Index.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.core.index;

import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.core.ParseField;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -134,6 +136,16 @@ public static Index fromXContent(final XContentParser parser) throws IOException
return INDEX_PARSER.parse(parser, null).build();
}

public void writeTo(IndexOutput out) throws IOException {
out.writeString(name);
out.writeString(uuid);
}

public Index(IndexInput in) throws IOException {
this.name = in.readString();
this.uuid = in.readString();
}

/**
* Builder for Index objects. Used by ObjectParser instances only.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.core.index.shard;

import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -147,4 +149,15 @@ public int compareTo(ShardId o) {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.value(toString());
}

public void writeTo(IndexOutput out) throws IOException {
index.writeTo(out);
out.writeVInt(shardId);
}

public ShardId(IndexInput in) throws IOException {
index = new Index(in);
shardId = in.readVInt();
hashCode = computeHashCode();
}
}
95 changes: 65 additions & 30 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2319,7 +2319,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
assert currentEngineReference.get() == null : "engine is running";
verifyNotClosed();
if (indexSettings.isRemoteStoreEnabled() && syncFromRemote) {
syncSegmentsFromRemoteSegmentStore(false, true, true);
syncSegmentsFromRemoteSegmentStore(false, true);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
if (syncFromRemote) {
Expand Down Expand Up @@ -4547,7 +4547,7 @@ public void close() throws IOException {
};
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false, true, true);
syncSegmentsFromRemoteSegmentStore(false, true);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
Expand Down Expand Up @@ -4604,14 +4604,53 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog(), logger);
}

/**
* Segment Replication method
*
* Downloads specified segments from remote store
* @param filesToFetch Files to download from remote store
*
*/
public List<StoreFileMetadata> syncSegmentsFromRemoteSegmentStore(List<StoreFileMetadata> filesToFetch) throws IOException {
assert indexSettings.isSegRepEnabled() && indexSettings.isRemoteStoreEnabled();
logger.trace("Downloading segments files from remote store {}", filesToFetch);
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteDirectory();
RemoteSegmentMetadata remoteSegmentMetadata = remoteSegmentStoreDirectory.init();
List<StoreFileMetadata> downloadedSegments = new ArrayList<>();
if (remoteSegmentMetadata != null) {
try {
store.incRef();
remoteStore.incRef();
final Directory storeDirectory = store.directory();
logger.info("--> storeDirectory {}", storeDirectory.getClass());
String segmentNFile = null;
for (StoreFileMetadata fileMetadata : filesToFetch) {
String file = fileMetadata.name();
logger.info("--> Copying file {}", file);
storeDirectory.copyFrom(remoteSegmentStoreDirectory, file, file, IOContext.DEFAULT);
downloadedSegments.add(fileMetadata);
if (file.startsWith(IndexFileNames.SEGMENTS)) {
assert segmentNFile == null : "There should be only one SegmentInfosSnapshot file";
segmentNFile = file;
}
}
storeDirectory.sync(downloadedSegments.stream().map(metadata -> metadata.name()).collect(Collectors.toList()));
} finally {
store.decRef();
remoteStore.decRef();
logger.trace("Downloaded segments from remote store {}", downloadedSegments);
}
}
return downloadedSegments;
}

/**
* Downloads segments from remote segment store.
* @param overrideLocal flag to override local segment files with those in remote store
* @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise
* @param shouldCommit if the shard requires committing the changes after sync from remote.
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync, boolean shouldCommit)
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync)
throws IOException {
assert indexSettings.isRemoteStoreEnabled();
logger.info("Downloading segments from remote segment store");
Expand Down Expand Up @@ -4653,34 +4692,30 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
indexInput,
remoteSegmentMetadata.getGeneration()
);
if (shouldCommit) {
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs
// with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N,
// after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the
// policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the latest
// commit.
Optional<String> localMaxSegmentInfos = localSegmentFiles.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
if (localMaxSegmentInfos.isPresent()
&& infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get())
- 1) {
// If remote translog is not enabled, local translog will be created with different UUID.
// This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs
// to be same. Following code block make sure to have the same UUID.
if (indexSettings.isRemoteTranslogStoreEnabled() == false) {
SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo();
Map<String, String> userData = new HashMap<>(infosSnapshot.getUserData());
userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY));
infosSnapshot.setUserData(userData, false);
}
storeDirectory.deleteFile(localMaxSegmentInfos.get());
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs
// with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N,
// after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the
// policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the latest
// commit.
Optional<String> localMaxSegmentInfos = localSegmentFiles.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
if (localMaxSegmentInfos.isPresent()
&& infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get())
- 1) {
// If remote translog is not enabled, local translog will be created with different UUID.
// This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs
// to be same. Following code block make sure to have the same UUID.
if (indexSettings.isRemoteTranslogStoreEnabled() == false) {
SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo();
Map<String, String> userData = new HashMap<>(infosSnapshot.getUserData());
userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY));
infosSnapshot.setUserData(userData, false);
}
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
} else {
finalizeReplication(infosSnapshot);
storeDirectory.deleteFile(localMaxSegmentInfos.get());
}
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ private synchronized void syncSegments(boolean isRetry) {
public void onResponse(Void unused) {
try {
// Start metadata file upload
uploadMetadata(localSegmentsPostRefresh, segmentInfos);
uploadMetadata(localSegmentsPostRefresh, segmentInfos, checkpoint);
clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh);
onSuccessfulSegmentsSync(
refreshTimeMs,
Expand Down Expand Up @@ -382,7 +382,7 @@ private boolean isRefreshAfterCommit() throws IOException {
&& !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)));
}

void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos) throws IOException {
void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint) throws IOException {
final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint();
SegmentInfos segmentInfosSnapshot = segmentInfos.clone();
Map<String, String> userData = segmentInfosSnapshot.getUserData();
Expand All @@ -399,8 +399,8 @@ void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos se
localSegmentsPostRefresh,
segmentInfosSnapshot,
storeDirectory,
indexShard.getOperationPrimaryTerm(),
translogFileGeneration
translogFileGeneration,
replicationCheckpoint
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
remoteStore.incRef();
try {
// Download segments from remote segment store
indexShard.syncSegmentsFromRemoteSegmentStore(true, true, true);
indexShard.syncSegmentsFromRemoteSegmentStore(true, true);

if (store.directory().listAll().length == 0) {
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.threadpool.ThreadPool;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -569,12 +570,12 @@ public void uploadMetadata(
Collection<String> segmentFiles,
SegmentInfos segmentInfosSnapshot,
Directory storeDirectory,
long primaryTerm,
long translogGeneration
long translogGeneration,
ReplicationCheckpoint replicationCheckpoint
) throws IOException {
synchronized (this) {
String metadataFilename = MetadataFilenameUtils.getMetadataFilename(
primaryTerm,
replicationCheckpoint.getPrimaryTerm(),
segmentInfosSnapshot.getGeneration(),
translogGeneration,
metadataUploadCounter.incrementAndGet(),
Expand Down Expand Up @@ -602,8 +603,7 @@ public void uploadMetadata(
new RemoteSegmentMetadata(
RemoteSegmentMetadata.fromMapOfStrings(uploadedSegments),
segmentInfoSnapshotByteArray,
primaryTerm,
segmentInfosSnapshot.getGeneration()
replicationCheckpoint
)
);
}
Expand Down
9 changes: 5 additions & 4 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -845,22 +845,23 @@ private void cleanupFiles(Collection<String> filesToConsiderForCleanup, String r
* @param tmpToFileName Map of temporary replication file to actual file name
* @param infosBytes bytes[] of SegmentInfos supposed to be sent over by primary excluding segment_N file
* @param segmentsGen segment generation number
* @param consumer consumer for generated SegmentInfos
* @param finalizeConsumer consumer for generated SegmentInfos
* @throws IOException Exception while reading store and building segment infos
*/
public void buildInfosFromBytes(
Map<String, String> tmpToFileName,
byte[] infosBytes,
long segmentsGen,
CheckedConsumer<SegmentInfos, IOException> consumer
CheckedConsumer<SegmentInfos, IOException> finalizeConsumer,
CheckedConsumer<Map<String,String>, IOException> renameConsumer
) throws IOException {
metadataLock.writeLock().lock();
try {
final List<String> values = new ArrayList<>(tmpToFileName.values());
incRefFileDeleter(values);
try {
renameTempFilesSafe(tmpToFileName);
consumer.accept(buildSegmentInfos(infosBytes, segmentsGen));
renameConsumer.accept(tmpToFileName);
finalizeConsumer.accept(buildSegmentInfos(infosBytes, segmentsGen));
} finally {
decRefFileDeleter(values);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;

/**
* Metadata object for Remote Segment
Expand All @@ -38,19 +39,29 @@ public class RemoteSegmentMetadata {

private final byte[] segmentInfosBytes;

private final long primaryTerm;
private final long generation;
// private final long primaryTerm;
// private final long generation;
//
// private final long version;
//
// private final long length;
//
// private final String codec;

public ReplicationCheckpoint getReplicationCheckpoint() {
return replicationCheckpoint;
}

private final ReplicationCheckpoint replicationCheckpoint;

public RemoteSegmentMetadata(
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> metadata,
byte[] segmentInfosBytes,
long primaryTerm,
long generation
ReplicationCheckpoint replicationCheckpoint
) {
this.metadata = metadata;
this.segmentInfosBytes = segmentInfosBytes;
this.generation = generation;
this.primaryTerm = primaryTerm;
this.replicationCheckpoint = replicationCheckpoint;
}

/**
Expand All @@ -66,11 +77,11 @@ public byte[] getSegmentInfosBytes() {
}

public long getGeneration() {
return generation;
return replicationCheckpoint.getSegmentsGen();
}

public long getPrimaryTerm() {
return primaryTerm;
return replicationCheckpoint.getPrimaryTerm();
}

/**
Expand Down Expand Up @@ -99,19 +110,17 @@ public static Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> f

public void write(IndexOutput out) throws IOException {
out.writeMapOfStrings(toMapOfStrings());
out.writeLong(generation);
out.writeLong(primaryTerm);
replicationCheckpoint.writeTo(out);
out.writeLong(segmentInfosBytes.length);
out.writeBytes(segmentInfosBytes, segmentInfosBytes.length);
}

public static RemoteSegmentMetadata read(IndexInput indexInput) throws IOException {
Map<String, String> metadata = indexInput.readMapOfStrings();
long generation = indexInput.readLong();
long primaryTerm = indexInput.readLong();
ReplicationCheckpoint replicationCheckpoint = new ReplicationCheckpoint(indexInput);
int byteArraySize = (int) indexInput.readLong();
byte[] segmentInfosBytes = new byte[byteArraySize];
indexInput.readBytes(segmentInfosBytes, 0, byteArraySize);
return new RemoteSegmentMetadata(RemoteSegmentMetadata.fromMapOfStrings(metadata), segmentInfosBytes, primaryTerm, generation);
return new RemoteSegmentMetadata(RemoteSegmentMetadata.fromMapOfStrings(metadata), segmentInfosBytes, replicationCheckpoint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
indexShard.prepareForIndexRecovery();
final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled();
if (hasRemoteSegmentStore) {
indexShard.syncSegmentsFromRemoteSegmentStore(false, false, true);
indexShard.syncSegmentsFromRemoteSegmentStore(false, false);
}
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
*/
public class GetSegmentFilesResponse extends TransportResponse {

public List<StoreFileMetadata> getFiles() {
return files;
}

List<StoreFileMetadata> files;

public GetSegmentFilesResponse(List<StoreFileMetadata> files) {
Expand Down
Loading

0 comments on commit a7f283d

Please sign in to comment.