Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add support for refresh level durability using remote segment store #5749

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348))
- Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668))
- Experimental support for extended backward compatiblity in searchable snapshots ([#5429](https://github.com/opensearch-project/OpenSearch/pull/5429))
- Add support for refresh level durability ([#5253](https://github.com/opensearch-project/OpenSearch/pull/5253))

### Dependencies
- Bump bcpg-fips from 1.0.5.1 to 1.0.7.1 ([#5148](https://github.com/opensearch-project/OpenSearch/pull/5148))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2932,7 +2932,7 @@ public long tryDeleteDocument(IndexReader readerIn, int docID) {
/**
* Returned the last local checkpoint value has been refreshed internally.
*/
final long lastRefreshedCheckpoint() {
public final long lastRefreshedCheckpoint() {
return lastRefreshedCheckpointListener.refreshedCheckpoint.get();
}

Expand Down
103 changes: 103 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.carrotsearch.hppc.ObjectLongMap;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
Expand All @@ -48,6 +49,12 @@
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.SetOnce;
import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.Assertions;
Expand Down Expand Up @@ -89,6 +96,7 @@
import org.opensearch.common.util.concurrent.AsyncIOProcessor;
import org.opensearch.common.util.concurrent.RunOnce;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.set.Sets;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.gateway.WriteStateException;
Expand Down Expand Up @@ -144,6 +152,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.Store.MetadataSnapshot;
import org.opensearch.index.store.StoreFileMetadata;
Expand Down Expand Up @@ -171,10 +180,12 @@
import org.opensearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -202,8 +213,10 @@
import java.util.stream.StreamSupport;

import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX;

/**
* An OpenSearch index shard
Expand Down Expand Up @@ -2023,6 +2036,9 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
synchronized (engineMutex) {
assert currentEngineReference.get() == null : "engine is running";
verifyNotClosed();
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
}
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
final Engine newEngine = engineFactory.newReadWriteEngine(config);
onNewEngine(newEngine);
Expand Down Expand Up @@ -4132,6 +4148,9 @@ public void close() throws IOException {
}
};
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
}
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
onNewEngine(newEngineReference.get());
}
Expand All @@ -4157,6 +4176,90 @@ public void close() throws IOException {
onSettingsChanged();
}

/**
* Downloads segments from remote segment store.
* @param overrideLocal flag to override local segment files with those in remote store
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException {
assert indexSettings.isRemoteStoreEnabled();
logger.info("Downloading segments from remote segment store");
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory();
assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory
: "Store.directory is not enclosing an instance of FilterDirectory";
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate();
// We need to call RemoteSegmentStoreDirectory.init() in order to get latest metadata of the files that
// are uploaded to the remote segment store.
assert remoteDirectory instanceof RemoteSegmentStoreDirectory : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory";
((RemoteSegmentStoreDirectory) remoteDirectory).init();
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = ((RemoteSegmentStoreDirectory) remoteDirectory)
.getSegmentsUploadedToRemoteStore();
final Directory storeDirectory = store.directory();
store.incRef();
remoteStore.incRef();
List<String> downloadedSegments = new ArrayList<>();
List<String> skippedSegments = new ArrayList<>();
try {
String segmentInfosSnapshotFilename = null;
Set<String> localSegmentFiles = Sets.newHashSet(storeDirectory.listAll());
for (String file : uploadedSegments.keySet()) {
long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum());
if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) {
if (localSegmentFiles.contains(file)) {
storeDirectory.deleteFile(file);
}
storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT);
downloadedSegments.add(file);
if (file.startsWith(SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX)) {
assert segmentInfosSnapshotFilename == null : "There should be only one SegmentInfosSnapshot file";
segmentInfosSnapshotFilename = file;
}
} else {
skippedSegments.add(file);
}
}
if (segmentInfosSnapshotFilename != null) {
try (
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
storeDirectory.openInput(segmentInfosSnapshotFilename, IOContext.DEFAULT)
)
) {
SegmentInfos infosSnapshot = SegmentInfos.readCommit(
store.directory(),
indexInput,
Long.parseLong(segmentInfosSnapshotFilename.split("__")[1])
);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
}
} catch (IOException e) {
throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e);
} finally {
logger.info("Downloaded segments: {}", downloadedSegments);
logger.info("Skipped download for segments: {}", skippedSegments);
store.decRef();
remoteStore.decRef();
}
}

private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) {
try (IndexInput indexInput = localDirectory.openInput(file, IOContext.DEFAULT)) {
if (checksum == CodecUtil.retrieveChecksum(indexInput)) {
return true;
} else {
logger.warn("Checksum mismatch between local and remote segment file: {}, will override local file", file);
}
} catch (NoSuchFileException | FileNotFoundException e) {
logger.debug("File {} does not exist in local FS, downloading from remote store", file);
} catch (IOException e) {
logger.warn("Exception while reading checksum of file: {}, this can happen if file is corrupted", file);
}
return false;
}

/**
* Returns the maximum sequence number of either update or delete operations have been processed in this shard
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand All @@ -34,6 +38,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;

/**
* RefreshListener implementation to upload newly created segment files to the remote store
*
Expand All @@ -44,6 +50,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres
static final Set<String> EXCLUDE_FILES = Set.of("write.lock");
// Visible for testing
static final int LAST_N_METADATA_FILES_TO_KEEP = 10;
static final String SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX = "segment_infos_snapshot_filename";

private final IndexShard indexShard;
private final Directory storeDirectory;
Expand Down Expand Up @@ -88,46 +95,67 @@ public void afterRefresh(boolean didRefresh) {
this.remoteDirectory.init();
}
try {
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory);
if (!remoteDirectory.containsFile(
lastCommittedLocalSegmentFileName,
getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)
)) {
// if a new segments_N file is present in local that is not uploaded to remote store yet, it
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
// This is done to avoid delete post each refresh.
// Ideally, we want this to be done in async flow. (GitHub issue #4315)
if (isRefreshAfterCommit()) {
deleteStaleCommits();
}

String segmentInfoSnapshotFilename = null;
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
Collection<String> refreshedLocalFiles = segmentInfos.files(true);

List<String> segmentInfosFiles = refreshedLocalFiles.stream()
Collection<String> localSegmentsPostRefresh = segmentInfos.files(true);

List<String> segmentInfosFiles = localSegmentsPostRefresh.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.collect(Collectors.toList());
Optional<String> latestSegmentInfos = segmentInfosFiles.stream()
.max(Comparator.comparingLong(IndexFileNames::parseGeneration));
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));

if (latestSegmentInfos.isPresent()) {
refreshedLocalFiles.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true));
// SegmentInfosSnapshot is a snapshot of reader's view of segments and may not contain
// all the segments from last commit if they are merged away but not yet committed.
// Each metadata file in the remote segment store represents a commit and the following
// statement keeps sure that each metadata will always contain all the segments from last commit + refreshed
// segments.
localSegmentsPostRefresh.addAll(
SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true)
);
segmentInfosFiles.stream()
.filter(file -> !file.equals(latestSegmentInfos.get()))
.forEach(refreshedLocalFiles::remove);
.forEach(localSegmentsPostRefresh::remove);

boolean uploadStatus = uploadNewSegments(refreshedLocalFiles);
boolean uploadStatus = uploadNewSegments(localSegmentsPostRefresh);
if (uploadStatus) {
segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos);
localSegmentsPostRefresh.add(segmentInfoSnapshotFilename);

remoteDirectory.uploadMetadata(
refreshedLocalFiles,
localSegmentsPostRefresh,
storeDirectory,
indexShard.getOperationPrimaryTerm(),
segmentInfos.getGeneration()
);
localSegmentChecksumMap.keySet()
.stream()
.filter(file -> !refreshedLocalFiles.contains(file))
.filter(file -> !localSegmentsPostRefresh.contains(file))
.collect(Collectors.toSet())
.forEach(localSegmentChecksumMap::remove);
}
}
} catch (EngineException e) {
logger.warn("Exception while reading SegmentInfosSnapshot", e);
} finally {
try {
if (segmentInfoSnapshotFilename != null) {
storeDirectory.deleteFile(segmentInfoSnapshotFilename);
}
} catch (IOException e) {
logger.warn("Exception while deleting: " + segmentInfoSnapshotFilename, e);
}
}
} catch (IOException e) {
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
Expand All @@ -141,6 +169,39 @@ public void afterRefresh(boolean didRefresh) {
}
}

private boolean isRefreshAfterCommit() throws IOException {
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory);
return (lastCommittedLocalSegmentFileName != null
&& !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)));
}

String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos segmentInfosSnapshot) throws IOException {
// We use lastRefreshedCheckpoint as local checkpoint for the SegmentInfosSnapshot. This is better than using
// getProcessedLocalCheckpoint() as processedCheckpoint can advance between reading the value and setting up
// in SegmentInfos.userData. This could lead to data loss as, during recovery, translog will be replayed based on
// LOCAL_CHECKPOINT_KEY.
// lastRefreshedCheckpoint is updated after refresh listeners are executed, this means, InternalEngine.lastRefreshedCheckpoint()
// will return checkpoint of last refresh but that does not impact the correctness as duplicate sequence numbers
// will not be replayed.
assert indexShard.getEngine() instanceof InternalEngine : "Expected shard with InternalEngine, got: "
+ indexShard.getEngine().getClass();
final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();

Map<String, String> userData = segmentInfosSnapshot.getUserData();
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(lastRefreshedCheckpoint));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(lastRefreshedCheckpoint));
segmentInfosSnapshot.setUserData(userData, false);

long commitGeneration = SegmentInfos.generationFromSegmentsFileName(latestSegmentsNFilename);
String segmentInfoSnapshotFilename = SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX + "__" + commitGeneration;
try (IndexOutput indexOutput = storeDirectory.createOutput(segmentInfoSnapshotFilename, IOContext.DEFAULT)) {
segmentInfosSnapshot.write(indexOutput);
}
storeDirectory.sync(Collections.singleton(segmentInfoSnapshotFilename));
remoteDirectory.copyFrom(storeDirectory, segmentInfoSnapshotFilename, segmentInfoSnapshotFilename, IOContext.DEFAULT, true);
return segmentInfoSnapshotFilename;
}

// Visible for testing
boolean uploadNewSegments(Collection<String> localFiles) throws IOException {
AtomicBoolean uploadSuccess = new AtomicBoolean(true);
Expand Down
Loading