From 026ab3642e66c0619abfd67f2da1a6baf5bf8d3a Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Mon, 9 Jan 2023 11:10:02 +0530 Subject: [PATCH] Add support for refresh level durability (#5749) Signed-off-by: Sachin Kale --- CHANGELOG.md | 1 + .../index/engine/InternalEngine.java | 2 +- .../opensearch/index/shard/IndexShard.java | 103 ++++++++++++++++++ .../shard/RemoteStoreRefreshListener.java | 87 ++++++++++++--- .../opensearch/index/shard/StoreRecovery.java | 21 +--- .../store/RemoteSegmentStoreDirectory.java | 29 +++-- .../index/shard/IndexShardTests.java | 19 +++- .../RemoteStoreRefreshListenerTests.java | 19 ++++ .../SegmentReplicationIndexShardTests.java | 89 +++++++++++++++ .../RemoteSegmentStoreDirectoryTests.java | 27 +++++ ...enSearchIndexLevelReplicationTestCase.java | 39 ++++++- .../index/shard/IndexShardTestCase.java | 21 ++-- 12 files changed, 403 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c4676779f0fa7..9d6f439b02998 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348)) - Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668)) - Experimental support for extended backward compatiblity in searchable snapshots ([#5429](https://github.com/opensearch-project/OpenSearch/pull/5429)) +- Add support for refresh level durability ([#5253](https://github.com/opensearch-project/OpenSearch/pull/5253)) ### Dependencies - Bump bcpg-fips from 1.0.5.1 to 1.0.7.1 ([#5148](https://github.com/opensearch-project/OpenSearch/pull/5148)) diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index a8b00c9ed8504..e9c61b3ee3766 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -2932,7 +2932,7 @@ public long tryDeleteDocument(IndexReader readerIn, int docID) { /** * Returned the last local checkpoint value has been refreshed internally. */ - final long lastRefreshedCheckpoint() { + public final long lastRefreshedCheckpoint() { return lastRefreshedCheckpointListener.refreshedCheckpoint.get(); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index a800f26a58c08..596cf28c5e94e 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -35,6 +35,7 @@ import com.carrotsearch.hppc.ObjectLongMap; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.FilterDirectoryReader; @@ -48,6 +49,12 @@ import org.apache.lucene.search.Sort; import org.apache.lucene.search.UsageTrackingQueryCachingPolicy; import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.BufferedChecksumIndexInput; +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.ThreadInterruptedException; import org.opensearch.Assertions; @@ -89,6 +96,7 @@ import org.opensearch.common.util.concurrent.AsyncIOProcessor; import org.opensearch.common.util.concurrent.RunOnce; import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.util.set.Sets; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.internal.io.IOUtils; import org.opensearch.gateway.WriteStateException; @@ -144,6 +152,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.opensearch.index.similarity.SimilarityService; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.Store.MetadataSnapshot; import org.opensearch.index.store.StoreFileMetadata; @@ -171,10 +180,12 @@ import org.opensearch.threadpool.ThreadPool; import java.io.Closeable; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; import java.nio.channels.ClosedByInterruptException; import java.nio.charset.StandardCharsets; +import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -202,8 +213,10 @@ import java.util.stream.StreamSupport; import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; +import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX; /** * An OpenSearch index shard @@ -2023,6 +2036,9 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t synchronized (engineMutex) { assert currentEngineReference.get() == null : "engine is running"; verifyNotClosed(); + if (indexSettings.isRemoteStoreEnabled()) { + syncSegmentsFromRemoteSegmentStore(false); + } // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). final Engine newEngine = engineFactory.newReadWriteEngine(config); onNewEngine(newEngine); @@ -4132,6 +4148,9 @@ public void close() throws IOException { } }; IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); + if (indexSettings.isRemoteStoreEnabled()) { + syncSegmentsFromRemoteSegmentStore(false); + } newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); onNewEngine(newEngineReference.get()); } @@ -4157,6 +4176,90 @@ public void close() throws IOException { onSettingsChanged(); } + /** + * Downloads segments from remote segment store. + * @param overrideLocal flag to override local segment files with those in remote store + * @throws IOException if exception occurs while reading segments from remote store + */ + public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException { + assert indexSettings.isRemoteStoreEnabled(); + logger.info("Downloading segments from remote segment store"); + assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory"; + FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory(); + assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory + : "Store.directory is not enclosing an instance of FilterDirectory"; + FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); + final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate(); + // We need to call RemoteSegmentStoreDirectory.init() in order to get latest metadata of the files that + // are uploaded to the remote segment store. + assert remoteDirectory instanceof RemoteSegmentStoreDirectory : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory"; + ((RemoteSegmentStoreDirectory) remoteDirectory).init(); + Map uploadedSegments = ((RemoteSegmentStoreDirectory) remoteDirectory) + .getSegmentsUploadedToRemoteStore(); + final Directory storeDirectory = store.directory(); + store.incRef(); + remoteStore.incRef(); + List downloadedSegments = new ArrayList<>(); + List skippedSegments = new ArrayList<>(); + try { + String segmentInfosSnapshotFilename = null; + Set localSegmentFiles = Sets.newHashSet(storeDirectory.listAll()); + for (String file : uploadedSegments.keySet()) { + long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum()); + if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) { + if (localSegmentFiles.contains(file)) { + storeDirectory.deleteFile(file); + } + storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT); + downloadedSegments.add(file); + if (file.startsWith(SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX)) { + assert segmentInfosSnapshotFilename == null : "There should be only one SegmentInfosSnapshot file"; + segmentInfosSnapshotFilename = file; + } + } else { + skippedSegments.add(file); + } + } + if (segmentInfosSnapshotFilename != null) { + try ( + ChecksumIndexInput indexInput = new BufferedChecksumIndexInput( + storeDirectory.openInput(segmentInfosSnapshotFilename, IOContext.DEFAULT) + ) + ) { + SegmentInfos infosSnapshot = SegmentInfos.readCommit( + store.directory(), + indexInput, + Long.parseLong(segmentInfosSnapshotFilename.split("__")[1]) + ); + long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); + store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); + } + } + } catch (IOException e) { + throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e); + } finally { + logger.info("Downloaded segments: {}", downloadedSegments); + logger.info("Skipped download for segments: {}", skippedSegments); + store.decRef(); + remoteStore.decRef(); + } + } + + private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) { + try (IndexInput indexInput = localDirectory.openInput(file, IOContext.DEFAULT)) { + if (checksum == CodecUtil.retrieveChecksum(indexInput)) { + return true; + } else { + logger.warn("Checksum mismatch between local and remote segment file: {}, will override local file", file); + } + } catch (NoSuchFileException | FileNotFoundException e) { + logger.debug("File {} does not exist in local FS, downloading from remote store", file); + } catch (IOException e) { + logger.warn("Exception while reading checksum of file: {}, this can happen if file is corrupted", file); + } + return false; + } + /** * Returns the maximum sequence number of either update or delete operations have been processed in this shard * or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index a8ca9891d9743..838cbc7f25e09 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -19,12 +19,16 @@ import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.index.engine.EngineException; +import org.opensearch.index.engine.InternalEngine; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -34,6 +38,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; + /** * RefreshListener implementation to upload newly created segment files to the remote store * @@ -44,6 +50,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres static final Set EXCLUDE_FILES = Set.of("write.lock"); // Visible for testing static final int LAST_N_METADATA_FILES_TO_KEEP = 10; + static final String SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX = "segment_infos_snapshot_filename"; private final IndexShard indexShard; private final Directory storeDirectory; @@ -88,46 +95,67 @@ public void afterRefresh(boolean didRefresh) { this.remoteDirectory.init(); } try { - String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory); - if (!remoteDirectory.containsFile( - lastCommittedLocalSegmentFileName, - getChecksumOfLocalFile(lastCommittedLocalSegmentFileName) - )) { + // if a new segments_N file is present in local that is not uploaded to remote store yet, it + // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. + // This is done to avoid delete post each refresh. + // Ideally, we want this to be done in async flow. (GitHub issue #4315) + if (isRefreshAfterCommit()) { deleteStaleCommits(); } + + String segmentInfoSnapshotFilename = null; try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); - Collection refreshedLocalFiles = segmentInfos.files(true); - List segmentInfosFiles = refreshedLocalFiles.stream() + Collection localSegmentsPostRefresh = segmentInfos.files(true); + + List segmentInfosFiles = localSegmentsPostRefresh.stream() .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) .collect(Collectors.toList()); Optional latestSegmentInfos = segmentInfosFiles.stream() - .max(Comparator.comparingLong(IndexFileNames::parseGeneration)); + .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); if (latestSegmentInfos.isPresent()) { - refreshedLocalFiles.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true)); + // SegmentInfosSnapshot is a snapshot of reader's view of segments and may not contain + // all the segments from last commit if they are merged away but not yet committed. + // Each metadata file in the remote segment store represents a commit and the following + // statement keeps sure that each metadata will always contain all the segments from last commit + refreshed + // segments. + localSegmentsPostRefresh.addAll( + SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true) + ); segmentInfosFiles.stream() .filter(file -> !file.equals(latestSegmentInfos.get())) - .forEach(refreshedLocalFiles::remove); + .forEach(localSegmentsPostRefresh::remove); - boolean uploadStatus = uploadNewSegments(refreshedLocalFiles); + boolean uploadStatus = uploadNewSegments(localSegmentsPostRefresh); if (uploadStatus) { + segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos); + localSegmentsPostRefresh.add(segmentInfoSnapshotFilename); + remoteDirectory.uploadMetadata( - refreshedLocalFiles, + localSegmentsPostRefresh, storeDirectory, indexShard.getOperationPrimaryTerm(), segmentInfos.getGeneration() ); localSegmentChecksumMap.keySet() .stream() - .filter(file -> !refreshedLocalFiles.contains(file)) + .filter(file -> !localSegmentsPostRefresh.contains(file)) .collect(Collectors.toSet()) .forEach(localSegmentChecksumMap::remove); } } } catch (EngineException e) { logger.warn("Exception while reading SegmentInfosSnapshot", e); + } finally { + try { + if (segmentInfoSnapshotFilename != null) { + storeDirectory.deleteFile(segmentInfoSnapshotFilename); + } + } catch (IOException e) { + logger.warn("Exception while deleting: " + segmentInfoSnapshotFilename, e); + } } } catch (IOException e) { // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried @@ -141,6 +169,39 @@ public void afterRefresh(boolean didRefresh) { } } + private boolean isRefreshAfterCommit() throws IOException { + String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory); + return (lastCommittedLocalSegmentFileName != null + && !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName))); + } + + String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos segmentInfosSnapshot) throws IOException { + // We use lastRefreshedCheckpoint as local checkpoint for the SegmentInfosSnapshot. This is better than using + // getProcessedLocalCheckpoint() as processedCheckpoint can advance between reading the value and setting up + // in SegmentInfos.userData. This could lead to data loss as, during recovery, translog will be replayed based on + // LOCAL_CHECKPOINT_KEY. + // lastRefreshedCheckpoint is updated after refresh listeners are executed, this means, InternalEngine.lastRefreshedCheckpoint() + // will return checkpoint of last refresh but that does not impact the correctness as duplicate sequence numbers + // will not be replayed. + assert indexShard.getEngine() instanceof InternalEngine : "Expected shard with InternalEngine, got: " + + indexShard.getEngine().getClass(); + final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); + + Map userData = segmentInfosSnapshot.getUserData(); + userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(lastRefreshedCheckpoint)); + userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(lastRefreshedCheckpoint)); + segmentInfosSnapshot.setUserData(userData, false); + + long commitGeneration = SegmentInfos.generationFromSegmentsFileName(latestSegmentsNFilename); + String segmentInfoSnapshotFilename = SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX + "__" + commitGeneration; + try (IndexOutput indexOutput = storeDirectory.createOutput(segmentInfoSnapshotFilename, IOContext.DEFAULT)) { + segmentInfosSnapshot.write(indexOutput); + } + storeDirectory.sync(Collections.singleton(segmentInfoSnapshotFilename)); + remoteDirectory.copyFrom(storeDirectory, segmentInfoSnapshotFilename, segmentInfoSnapshotFilename, IOContext.DEFAULT, true); + return segmentInfoSnapshotFilename; + } + // Visible for testing boolean uploadNewSegments(Collection localFiles) throws IOException { AtomicBoolean uploadSuccess = new AtomicBoolean(true); diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 6ca5036808818..ea71520ff561e 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -446,26 +446,13 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco } indexShard.preRecovery(); indexShard.prepareForIndexRecovery(); - assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory"; - FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory(); - assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory - : "Store.directory is not enclosing an instance of FilterDirectory"; - FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); - final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate(); final Store store = indexShard.store(); - final Directory storeDirectory = store.directory(); store.incRef(); remoteStore.incRef(); try { - // Cleaning up local directory before copying file from remote directory. - // This is done to make sure we start with clean slate. - // ToDo: Check if we can copy only missing files - for (String file : storeDirectory.listAll()) { - storeDirectory.deleteFile(file); - } - for (String file : remoteDirectory.listAll()) { - storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT); - } + // Download segments from remote segment store + indexShard.syncSegmentsFromRemoteSegmentStore(true); + // This creates empty trans-log for now // ToDo: Add code to restore from remote trans-log bootstrap(indexShard, store); @@ -475,7 +462,7 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); indexShard.finalizeRecovery(); indexShard.postRecovery("post recovery from remote_store"); - } catch (IOException e) { + } catch (IOException | IndexShardRecoveryException e) { throw new IndexShardRecoveryException(indexShard.shardId, "Exception while recovering from remote store", e); } finally { store.decRef(); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 505ad6fafd550..3b36d3777a712 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -66,7 +66,7 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory { * To prevent explosion of refresh metadata files, we replace refresh files for the given primary term and generation * This is achieved by uploading refresh metadata file with the same UUID suffix. */ - private String metadataFileUniqueSuffix; + private String commonFilenameSuffix; /** * Keeps track of local segment filename to uploaded filename along with other attributes like checksum. @@ -92,7 +92,7 @@ public RemoteSegmentStoreDirectory(RemoteDirectory remoteDataDirectory, RemoteDi * @throws IOException if there were any failures in reading the metadata file */ public void init() throws IOException { - this.metadataFileUniqueSuffix = UUIDs.base64UUID(); + this.commonFilenameSuffix = UUIDs.base64UUID(); this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(readLatestMetadataFile()); } @@ -154,6 +154,10 @@ public String toString() { return String.join(SEPARATOR, originalFilename, uploadedFilename, checksum); } + public String getChecksum() { + return this.checksum; + } + public static UploadedSegmentMetadata fromString(String uploadedFilename) { String[] values = uploadedFilename.split(SEPARATOR); return new UploadedSegmentMetadata(values[0], values[1], values[2]); @@ -293,17 +297,26 @@ public IndexInput openInput(String name, IOContext context) throws IOException { } } + public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix) throws IOException { + String remoteFilename; + if (useCommonSuffix) { + remoteFilename = dest + SEGMENT_NAME_UUID_SEPARATOR + this.commonFilenameSuffix; + } else { + remoteFilename = getNewRemoteSegmentFilename(dest); + } + remoteDataDirectory.copyFrom(from, src, remoteFilename, context); + String checksum = getChecksumOfLocalFile(from, src); + UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum); + segmentsUploadedToRemoteStore.put(src, segmentMetadata); + } + /** * Copies an existing src file from directory from to a non-existent file dest in this directory. * Once the segment is uploaded to remote segment store, update the cache accordingly. */ @Override public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException { - String remoteFilename = getNewRemoteSegmentFilename(dest); - remoteDataDirectory.copyFrom(from, src, remoteFilename, context); - String checksum = getChecksumOfLocalFile(from, src); - UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum); - segmentsUploadedToRemoteStore.put(src, segmentMetadata); + copyFrom(from, src, dest, context, false); } /** @@ -330,7 +343,7 @@ public boolean containsFile(String localFilename, String checksum) { public void uploadMetadata(Collection segmentFiles, Directory storeDirectory, long primaryTerm, long generation) throws IOException { synchronized (this) { - String metadataFilename = MetadataFilenameUtils.getMetadataFilename(primaryTerm, generation, this.metadataFileUniqueSuffix); + String metadataFilename = MetadataFilenameUtils.getMetadataFilename(primaryTerm, generation, this.commonFilenameSuffix); IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT); Map uploadedSegments = new HashMap<>(); for (String file : segmentFiles) { diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 84b7f6007dc11..163f60e347b5f 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -2675,10 +2675,21 @@ public void restoreShard( closeShards(target); } - public void testRestoreShardFromRemoteStore() throws IOException { + public void testRefreshLevelRestoreShardFromRemoteStore() throws IOException { + testRestoreShardFromRemoteStore(false); + } + + public void testCommitLevelRestoreShardFromRemoteStore() throws IOException { + testRestoreShardFromRemoteStore(true); + } + + public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOException { IndexShard target = newStartedShard( true, - Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(), + Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .build(), new InternalEngineFactory() ); @@ -2686,7 +2697,9 @@ public void testRestoreShardFromRemoteStore() throws IOException { indexDoc(target, "_doc", "2"); target.refresh("test"); assertDocs(target, "1", "2"); - flushShard(target); + if (performFlush) { + flushShard(target); + } ShardRouting routing = ShardRoutingHelper.initWithSameId( target.routingEntry(), diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 6b05d67836272..c9b8c023e26aa 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -8,6 +8,7 @@ package org.opensearch.index.shard; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; @@ -30,6 +31,8 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; +import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX; + public class RemoteStoreRefreshListenerTests extends IndexShardTestCase { private IndexShard indexShard; private RemoteStoreRefreshListener remoteStoreRefreshListener; @@ -204,13 +207,29 @@ public void onFailure(Exception e) { private void verifyUploadedSegments(RemoteSegmentStoreDirectory remoteSegmentStoreDirectory) throws IOException { Map uploadedSegments = remoteSegmentStoreDirectory .getSegmentsUploadedToRemoteStore(); + String segmentsNFilename = null; try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); for (String file : segmentInfos.files(true)) { if (!RemoteStoreRefreshListener.EXCLUDE_FILES.contains(file)) { assertTrue(uploadedSegments.containsKey(file)); } + if (file.startsWith(IndexFileNames.SEGMENTS)) { + segmentsNFilename = file; + } } } + if (segmentsNFilename != null) { + String commitGeneration = segmentsNFilename.substring((IndexFileNames.SEGMENTS + "_").length()); + assertTrue( + uploadedSegments.keySet() + .stream() + .anyMatch( + s -> s.startsWith( + SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX + "__" + Long.parseLong(commitGeneration, Character.MAX_RADIX) + ) + ) + ); + } } } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 78e880b4c9833..11fae987174f6 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -385,6 +385,95 @@ public void testReplicaRestarts() throws Exception { } } + public void testNRTReplicaWithRemoteStorePromotedAsPrimaryRefreshRefresh() throws Exception { + testNRTReplicaWithRemoteStorePromotedAsPrimary(false, false); + } + + public void testNRTReplicaWithRemoteStorePromotedAsPrimaryRefreshCommit() throws Exception { + testNRTReplicaWithRemoteStorePromotedAsPrimary(false, true); + } + + public void testNRTReplicaWithRemoteStorePromotedAsPrimaryCommitRefresh() throws Exception { + testNRTReplicaWithRemoteStorePromotedAsPrimary(true, false); + } + + public void testNRTReplicaWithRemoteStorePromotedAsPrimaryCommitCommit() throws Exception { + testNRTReplicaWithRemoteStorePromotedAsPrimary(true, true); + } + + private void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlushFirst, boolean performFlushSecond) throws Exception { + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .build(); + + try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir())) { + shards.startAll(); + IndexShard oldPrimary = shards.getPrimary(); + final IndexShard nextPrimary = shards.getReplicas().get(0); + + // 1. Create ops that are in the index and xlog of both shards but not yet part of a commit point. + final int numDocs = shards.indexDocs(randomInt(10)); + + // refresh but do not copy the segments over. + if (performFlushFirst) { + flushShard(oldPrimary, true); + } else { + oldPrimary.refresh("Test"); + } + // replicateSegments(primary, shards.getReplicas()); + + // at this point both shards should have numDocs persisted and searchable. + assertDocCounts(oldPrimary, numDocs, numDocs); + for (IndexShard shard : shards.getReplicas()) { + assertDocCounts(shard, numDocs, 0); + } + + // 2. Create ops that are in the replica's xlog, not in the index. + // index some more into both but don't replicate. replica will have only numDocs searchable, but should have totalDocs + // persisted. + final int additonalDocs = shards.indexDocs(randomInt(10)); + final int totalDocs = numDocs + additonalDocs; + + if (performFlushSecond) { + flushShard(oldPrimary, true); + } else { + oldPrimary.refresh("Test"); + } + assertDocCounts(oldPrimary, totalDocs, totalDocs); + for (IndexShard shard : shards.getReplicas()) { + assertDocCounts(shard, totalDocs, 0); + } + assertTrue(nextPrimary.translogStats().estimatedNumberOfOperations() >= additonalDocs); + assertTrue(nextPrimary.translogStats().getUncommittedOperations() >= additonalDocs); + + // promote the replica + shards.promoteReplicaToPrimary(nextPrimary).get(); + + // close oldPrimary. + oldPrimary.close("demoted", false); + oldPrimary.store().close(); + + assertEquals(InternalEngine.class, nextPrimary.getEngine().getClass()); + assertDocCounts(nextPrimary, totalDocs, totalDocs); + + // As we are downloading segments from remote segment store on failover, there should not be + // any operations replayed from translog + assertEquals(0, nextPrimary.translogStats().estimatedNumberOfOperations()); + + // refresh and push segments to our other replica. + nextPrimary.refresh("test"); + + for (IndexShard shard : shards) { + assertConsistentHistoryBetweenTranslogAndLucene(shard); + } + final List docsAfterRecovery = getDocIdAndSeqNos(shards.getPrimary()); + for (IndexShard shard : shards.getReplicas()) { + assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery)); + } + } + } + public void testNRTReplicaPromotedAsPrimary() throws Exception { try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory())) { shards.startAll(); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 96f14616fb54b..3f1c20f0d88a5 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -317,6 +317,33 @@ public void testCopyFromException() throws IOException { storeDirectory.close(); } + public void testCopyFromOverride() throws IOException { + String filename = "_100.si"; + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = LuceneTestCase.newDirectory(); + IndexOutput indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); + indexOutput.writeString("Hello World!"); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + storeDirectory.sync(List.of(filename)); + + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, filename, IOContext.DEFAULT, true); + RemoteSegmentStoreDirectory.UploadedSegmentMetadata uploadedSegmentMetadata = remoteSegmentStoreDirectory + .getSegmentsUploadedToRemoteStore() + .get(filename); + assertNotNull(uploadedSegmentMetadata); + remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, filename, IOContext.DEFAULT, true); + assertEquals( + uploadedSegmentMetadata.toString(), + remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().get(filename).toString() + ); + + storeDirectory.close(); + } + public void testContainsFile() throws IOException { List metadataFiles = List.of("metadata__1__5__abc"); when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index b3f062aef4fbe..45a07dc2800d0 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -97,6 +97,7 @@ import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; @@ -105,6 +106,7 @@ import org.opensearch.threadpool.ThreadPool.Names; import java.io.IOException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -145,9 +147,14 @@ protected ReplicationGroup createGroup(int replicas, Settings settings, EngineFa } protected ReplicationGroup createGroup(int replicas, Settings settings, String mappings, EngineFactory engineFactory) + throws IOException { + return createGroup(replicas, settings, mappings, engineFactory, null); + } + + protected ReplicationGroup createGroup(int replicas, Settings settings, String mappings, EngineFactory engineFactory, Path remotePath) throws IOException { IndexMetadata metadata = buildIndexMetadata(replicas, settings, mappings); - return new ReplicationGroup(metadata) { + return new ReplicationGroup(metadata, remotePath) { @Override protected EngineFactory getEngineFactory(ShardRouting routing) { return engineFactory; @@ -234,13 +241,29 @@ protected class ReplicationGroup implements AutoCloseable, Iterable ); protected ReplicationGroup(final IndexMetadata indexMetadata) throws IOException { + this(indexMetadata, null); + } + + protected ReplicationGroup(final IndexMetadata indexMetadata, Path remotePath) throws IOException { final ShardRouting primaryRouting = this.createShardRouting("s0", true); - primary = newShard(primaryRouting, indexMetadata, null, getEngineFactory(primaryRouting), () -> {}, retentionLeaseSyncer, null); + Store remoteStore = null; + if (remotePath != null) { + remoteStore = createRemoteStore(remotePath, primaryRouting, indexMetadata); + } + primary = newShard( + primaryRouting, + indexMetadata, + null, + getEngineFactory(primaryRouting), + () -> {}, + retentionLeaseSyncer, + remoteStore + ); replicas = new CopyOnWriteArrayList<>(); this.indexMetadata = indexMetadata; updateAllocationIDsOnPrimary(); for (int i = 0; i < indexMetadata.getNumberOfReplicas(); i++) { - addReplica(); + addReplica(remotePath); } } @@ -355,7 +378,15 @@ public void startPrimary() throws IOException { } public IndexShard addReplica() throws IOException { + return addReplica((Path) null); + } + + public IndexShard addReplica(Path remotePath) throws IOException { final ShardRouting replicaRouting = createShardRouting("s" + replicaId.incrementAndGet(), false); + Store remoteStore = null; + if (remotePath != null) { + remoteStore = createRemoteStore(remotePath, replicaRouting, indexMetadata); + } final IndexShard replica = newShard( replicaRouting, indexMetadata, @@ -363,7 +394,7 @@ public IndexShard addReplica() throws IOException { getEngineFactory(replicaRouting), () -> {}, retentionLeaseSyncer, - null + remoteStore ); addReplica(replica); return replica; diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index dad371614e1ac..34d09d5bdebfb 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -542,14 +542,7 @@ protected IndexShard newShard( clusterSettings ); if (remoteStore == null && indexSettings.isRemoteStoreEnabled()) { - ShardId shardId = shardPath.getShardId(); - NodeEnvironment.NodePath remoteNodePath = new NodeEnvironment.NodePath(createTempDir()); - ShardPath remoteShardPath = new ShardPath(false, remoteNodePath.resolve(shardId), remoteNodePath.resolve(shardId), shardId); - RemoteDirectory dataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); - RemoteDirectory metadataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); - RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory); - storeProvider = is -> createStore(shardId, is, remoteSegmentStoreDirectory); - remoteStore = storeProvider.apply(indexSettings); + remoteStore = createRemoteStore(createTempDir(), routing, indexMetadata); } indexShard = new IndexShard( routing, @@ -585,6 +578,18 @@ protected IndexShard newShard( return indexShard; } + protected Store createRemoteStore(Path path, ShardRouting shardRouting, IndexMetadata metadata) throws IOException { + Settings nodeSettings = Settings.builder().put("node.name", shardRouting.currentNodeId()).build(); + + ShardId shardId = new ShardId("index", "_na_", 0); + NodeEnvironment.NodePath remoteNodePath = new NodeEnvironment.NodePath(path); + ShardPath remoteShardPath = new ShardPath(false, remoteNodePath.resolve(shardId), remoteNodePath.resolve(shardId), shardId); + RemoteDirectory dataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); + RemoteDirectory metadataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory); + return createStore(shardId, new IndexSettings(metadata, nodeSettings), remoteSegmentStoreDirectory); + } + private RemoteDirectory newRemoteDirectory(Path f) throws IOException { FsBlobStore fsBlobStore = new FsBlobStore(1024, f, false); BlobPath blobPath = new BlobPath();