Skip to content

Commit

Permalink
Add support for refresh level durability (opensearch-project#5749)
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale authored and Sachin Kale committed Jan 9, 2023
1 parent ee33c9e commit 026ab36
Show file tree
Hide file tree
Showing 12 changed files with 403 additions and 54 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348))
- Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668))
- Experimental support for extended backward compatiblity in searchable snapshots ([#5429](https://github.com/opensearch-project/OpenSearch/pull/5429))
- Add support for refresh level durability ([#5253](https://github.com/opensearch-project/OpenSearch/pull/5253))

### Dependencies
- Bump bcpg-fips from 1.0.5.1 to 1.0.7.1 ([#5148](https://github.com/opensearch-project/OpenSearch/pull/5148))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2932,7 +2932,7 @@ public long tryDeleteDocument(IndexReader readerIn, int docID) {
/**
* Returned the last local checkpoint value has been refreshed internally.
*/
final long lastRefreshedCheckpoint() {
public final long lastRefreshedCheckpoint() {
return lastRefreshedCheckpointListener.refreshedCheckpoint.get();
}

Expand Down
103 changes: 103 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.carrotsearch.hppc.ObjectLongMap;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
Expand All @@ -48,6 +49,12 @@
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.SetOnce;
import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.Assertions;
Expand Down Expand Up @@ -89,6 +96,7 @@
import org.opensearch.common.util.concurrent.AsyncIOProcessor;
import org.opensearch.common.util.concurrent.RunOnce;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.set.Sets;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.gateway.WriteStateException;
Expand Down Expand Up @@ -144,6 +152,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.Store.MetadataSnapshot;
import org.opensearch.index.store.StoreFileMetadata;
Expand Down Expand Up @@ -171,10 +180,12 @@
import org.opensearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -202,8 +213,10 @@
import java.util.stream.StreamSupport;

import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX;

/**
* An OpenSearch index shard
Expand Down Expand Up @@ -2023,6 +2036,9 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
synchronized (engineMutex) {
assert currentEngineReference.get() == null : "engine is running";
verifyNotClosed();
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
}
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
final Engine newEngine = engineFactory.newReadWriteEngine(config);
onNewEngine(newEngine);
Expand Down Expand Up @@ -4132,6 +4148,9 @@ public void close() throws IOException {
}
};
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
}
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
onNewEngine(newEngineReference.get());
}
Expand All @@ -4157,6 +4176,90 @@ public void close() throws IOException {
onSettingsChanged();
}

/**
* Downloads segments from remote segment store.
* @param overrideLocal flag to override local segment files with those in remote store
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException {
assert indexSettings.isRemoteStoreEnabled();
logger.info("Downloading segments from remote segment store");
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory();
assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory
: "Store.directory is not enclosing an instance of FilterDirectory";
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate();
// We need to call RemoteSegmentStoreDirectory.init() in order to get latest metadata of the files that
// are uploaded to the remote segment store.
assert remoteDirectory instanceof RemoteSegmentStoreDirectory : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory";
((RemoteSegmentStoreDirectory) remoteDirectory).init();
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = ((RemoteSegmentStoreDirectory) remoteDirectory)
.getSegmentsUploadedToRemoteStore();
final Directory storeDirectory = store.directory();
store.incRef();
remoteStore.incRef();
List<String> downloadedSegments = new ArrayList<>();
List<String> skippedSegments = new ArrayList<>();
try {
String segmentInfosSnapshotFilename = null;
Set<String> localSegmentFiles = Sets.newHashSet(storeDirectory.listAll());
for (String file : uploadedSegments.keySet()) {
long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum());
if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) {
if (localSegmentFiles.contains(file)) {
storeDirectory.deleteFile(file);
}
storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT);
downloadedSegments.add(file);
if (file.startsWith(SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX)) {
assert segmentInfosSnapshotFilename == null : "There should be only one SegmentInfosSnapshot file";
segmentInfosSnapshotFilename = file;
}
} else {
skippedSegments.add(file);
}
}
if (segmentInfosSnapshotFilename != null) {
try (
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
storeDirectory.openInput(segmentInfosSnapshotFilename, IOContext.DEFAULT)
)
) {
SegmentInfos infosSnapshot = SegmentInfos.readCommit(
store.directory(),
indexInput,
Long.parseLong(segmentInfosSnapshotFilename.split("__")[1])
);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
}
} catch (IOException e) {
throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e);
} finally {
logger.info("Downloaded segments: {}", downloadedSegments);
logger.info("Skipped download for segments: {}", skippedSegments);
store.decRef();
remoteStore.decRef();
}
}

private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) {
try (IndexInput indexInput = localDirectory.openInput(file, IOContext.DEFAULT)) {
if (checksum == CodecUtil.retrieveChecksum(indexInput)) {
return true;
} else {
logger.warn("Checksum mismatch between local and remote segment file: {}, will override local file", file);
}
} catch (NoSuchFileException | FileNotFoundException e) {
logger.debug("File {} does not exist in local FS, downloading from remote store", file);
} catch (IOException e) {
logger.warn("Exception while reading checksum of file: {}, this can happen if file is corrupted", file);
}
return false;
}

/**
* Returns the maximum sequence number of either update or delete operations have been processed in this shard
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand All @@ -34,6 +38,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;

/**
* RefreshListener implementation to upload newly created segment files to the remote store
*
Expand All @@ -44,6 +50,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres
static final Set<String> EXCLUDE_FILES = Set.of("write.lock");
// Visible for testing
static final int LAST_N_METADATA_FILES_TO_KEEP = 10;
static final String SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX = "segment_infos_snapshot_filename";

private final IndexShard indexShard;
private final Directory storeDirectory;
Expand Down Expand Up @@ -88,46 +95,67 @@ public void afterRefresh(boolean didRefresh) {
this.remoteDirectory.init();
}
try {
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory);
if (!remoteDirectory.containsFile(
lastCommittedLocalSegmentFileName,
getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)
)) {
// if a new segments_N file is present in local that is not uploaded to remote store yet, it
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
// This is done to avoid delete post each refresh.
// Ideally, we want this to be done in async flow. (GitHub issue #4315)
if (isRefreshAfterCommit()) {
deleteStaleCommits();
}

String segmentInfoSnapshotFilename = null;
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
Collection<String> refreshedLocalFiles = segmentInfos.files(true);

List<String> segmentInfosFiles = refreshedLocalFiles.stream()
Collection<String> localSegmentsPostRefresh = segmentInfos.files(true);

List<String> segmentInfosFiles = localSegmentsPostRefresh.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.collect(Collectors.toList());
Optional<String> latestSegmentInfos = segmentInfosFiles.stream()
.max(Comparator.comparingLong(IndexFileNames::parseGeneration));
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));

if (latestSegmentInfos.isPresent()) {
refreshedLocalFiles.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true));
// SegmentInfosSnapshot is a snapshot of reader's view of segments and may not contain
// all the segments from last commit if they are merged away but not yet committed.
// Each metadata file in the remote segment store represents a commit and the following
// statement keeps sure that each metadata will always contain all the segments from last commit + refreshed
// segments.
localSegmentsPostRefresh.addAll(
SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true)
);
segmentInfosFiles.stream()
.filter(file -> !file.equals(latestSegmentInfos.get()))
.forEach(refreshedLocalFiles::remove);
.forEach(localSegmentsPostRefresh::remove);

boolean uploadStatus = uploadNewSegments(refreshedLocalFiles);
boolean uploadStatus = uploadNewSegments(localSegmentsPostRefresh);
if (uploadStatus) {
segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos);
localSegmentsPostRefresh.add(segmentInfoSnapshotFilename);

remoteDirectory.uploadMetadata(
refreshedLocalFiles,
localSegmentsPostRefresh,
storeDirectory,
indexShard.getOperationPrimaryTerm(),
segmentInfos.getGeneration()
);
localSegmentChecksumMap.keySet()
.stream()
.filter(file -> !refreshedLocalFiles.contains(file))
.filter(file -> !localSegmentsPostRefresh.contains(file))
.collect(Collectors.toSet())
.forEach(localSegmentChecksumMap::remove);
}
}
} catch (EngineException e) {
logger.warn("Exception while reading SegmentInfosSnapshot", e);
} finally {
try {
if (segmentInfoSnapshotFilename != null) {
storeDirectory.deleteFile(segmentInfoSnapshotFilename);
}
} catch (IOException e) {
logger.warn("Exception while deleting: " + segmentInfoSnapshotFilename, e);
}
}
} catch (IOException e) {
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
Expand All @@ -141,6 +169,39 @@ public void afterRefresh(boolean didRefresh) {
}
}

private boolean isRefreshAfterCommit() throws IOException {
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory);
return (lastCommittedLocalSegmentFileName != null
&& !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)));
}

String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos segmentInfosSnapshot) throws IOException {
// We use lastRefreshedCheckpoint as local checkpoint for the SegmentInfosSnapshot. This is better than using
// getProcessedLocalCheckpoint() as processedCheckpoint can advance between reading the value and setting up
// in SegmentInfos.userData. This could lead to data loss as, during recovery, translog will be replayed based on
// LOCAL_CHECKPOINT_KEY.
// lastRefreshedCheckpoint is updated after refresh listeners are executed, this means, InternalEngine.lastRefreshedCheckpoint()
// will return checkpoint of last refresh but that does not impact the correctness as duplicate sequence numbers
// will not be replayed.
assert indexShard.getEngine() instanceof InternalEngine : "Expected shard with InternalEngine, got: "
+ indexShard.getEngine().getClass();
final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();

Map<String, String> userData = segmentInfosSnapshot.getUserData();
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(lastRefreshedCheckpoint));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(lastRefreshedCheckpoint));
segmentInfosSnapshot.setUserData(userData, false);

long commitGeneration = SegmentInfos.generationFromSegmentsFileName(latestSegmentsNFilename);
String segmentInfoSnapshotFilename = SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX + "__" + commitGeneration;
try (IndexOutput indexOutput = storeDirectory.createOutput(segmentInfoSnapshotFilename, IOContext.DEFAULT)) {
segmentInfosSnapshot.write(indexOutput);
}
storeDirectory.sync(Collections.singleton(segmentInfoSnapshotFilename));
remoteDirectory.copyFrom(storeDirectory, segmentInfoSnapshotFilename, segmentInfoSnapshotFilename, IOContext.DEFAULT, true);
return segmentInfoSnapshotFilename;
}

// Visible for testing
boolean uploadNewSegments(Collection<String> localFiles) throws IOException {
AtomicBoolean uploadSuccess = new AtomicBoolean(true);
Expand Down
Loading

0 comments on commit 026ab36

Please sign in to comment.